mirror of https://github.com/citusdata/citus.git
Merge pull request #2391 from citusdata/truncate_utility
Add distributed locking to truncated mx tablespull/2358/head
commit
22f5af1bc3
|
@ -36,6 +36,7 @@
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/foreign_constraint.h"
|
#include "distributed/foreign_constraint.h"
|
||||||
#include "distributed/intermediate_results.h"
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
|
@ -50,6 +51,7 @@
|
||||||
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
|
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "distributed/policy.h"
|
#include "distributed/policy.h"
|
||||||
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
|
@ -87,6 +89,9 @@
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');"
|
||||||
|
|
||||||
|
|
||||||
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
||||||
|
|
||||||
|
|
||||||
|
@ -146,6 +151,9 @@ static void ErrorIfAlterDropsPartitionColumn(AlterTableStmt *alterTableStatement
|
||||||
static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
static void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
|
||||||
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
static void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
|
||||||
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
||||||
|
static void ProcessTruncateStatement(TruncateStmt *truncateStatement);
|
||||||
|
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
|
||||||
|
static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
|
||||||
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
|
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
|
||||||
static void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt);
|
static void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt);
|
||||||
static void ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement);
|
static void ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement);
|
||||||
|
@ -386,6 +394,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
if (IsA(parsetree, TruncateStmt))
|
if (IsA(parsetree, TruncateStmt))
|
||||||
{
|
{
|
||||||
ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree);
|
ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree);
|
||||||
|
ProcessTruncateStatement((TruncateStmt *) parsetree);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* only generate worker DDLJobs if propagation is enabled */
|
/* only generate worker DDLJobs if propagation is enabled */
|
||||||
|
@ -2921,6 +2930,147 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ProcessTruncateStatement handles distributed locking
|
||||||
|
* of truncated tables before standard utility takes over.
|
||||||
|
*
|
||||||
|
* Actual distributed truncation occurs inside truncate trigger.
|
||||||
|
*
|
||||||
|
* This is only for distributed serialization of truncate commands.
|
||||||
|
* The function assumes that there is no foreign key relation between
|
||||||
|
* non-distributed and distributed relations.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ProcessTruncateStatement(TruncateStmt *truncateStatement)
|
||||||
|
{
|
||||||
|
LockTruncatedRelationMetadataInWorkers(truncateStatement);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockTruncatedRelationMetadataInWorkers determines if distributed
|
||||||
|
* lock is necessary for truncated relations, and acquire locks.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
|
||||||
|
{
|
||||||
|
List *distributedRelationList = NIL;
|
||||||
|
ListCell *relationCell = NULL;
|
||||||
|
|
||||||
|
/* nothing to do if there is no metadata at worker nodes */
|
||||||
|
if (!ClusterHasKnownMetadataWorkers())
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach(relationCell, truncateStatement->relations)
|
||||||
|
{
|
||||||
|
RangeVar *relationRV = (RangeVar *) lfirst(relationCell);
|
||||||
|
Relation relation = heap_openrv(relationRV, NoLock);
|
||||||
|
Oid relationId = RelationGetRelid(relation);
|
||||||
|
DistTableCacheEntry *cacheEntry = NULL;
|
||||||
|
List *referencingTableList = NIL;
|
||||||
|
ListCell *referencingTableCell = NULL;
|
||||||
|
|
||||||
|
if (!IsDistributedTable(relationId))
|
||||||
|
{
|
||||||
|
heap_close(relation, NoLock);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (list_member_oid(distributedRelationList, relationId))
|
||||||
|
{
|
||||||
|
heap_close(relation, NoLock);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
distributedRelationList = lappend_oid(distributedRelationList, relationId);
|
||||||
|
|
||||||
|
cacheEntry = DistributedTableCacheEntry(relationId);
|
||||||
|
Assert(cacheEntry != NULL);
|
||||||
|
|
||||||
|
referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
|
||||||
|
foreach(referencingTableCell, referencingTableList)
|
||||||
|
{
|
||||||
|
Oid referencingRelationId = lfirst_oid(referencingTableCell);
|
||||||
|
distributedRelationList = list_append_unique_oid(distributedRelationList,
|
||||||
|
referencingRelationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
heap_close(relation, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (distributedRelationList != NIL)
|
||||||
|
{
|
||||||
|
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
|
||||||
|
* for given list of relations ids. Relation id list and worker node list
|
||||||
|
* sorted so that the lock is acquired in the same order regardless of which
|
||||||
|
* node it was run on. Notice that no lock is acquired on coordinator node.
|
||||||
|
*
|
||||||
|
* Notice that the locking functions is sent to all workers regardless of if
|
||||||
|
* it has metadata or not. This is because a worker node only knows itself
|
||||||
|
* and previous workers that has metadata sync turned on. The node does not
|
||||||
|
* know about other nodes that have metadata sync turned on afterwards.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
ListCell *relationIdCell = NULL;
|
||||||
|
List *workerNodeList = ActivePrimaryNodeList();
|
||||||
|
const char *lockModeText = LockModeToLockModeText(lockMode);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We want to acquire locks in the same order accross the nodes.
|
||||||
|
* Although relation ids may change, their ordering will not.
|
||||||
|
*/
|
||||||
|
relationIdList = SortList(relationIdList, CompareOids);
|
||||||
|
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||||
|
|
||||||
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
foreach(relationIdCell, relationIdList)
|
||||||
|
{
|
||||||
|
Oid relationId = lfirst_oid(relationIdCell);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We only acquire distributed lock on relation if
|
||||||
|
* the relation is sync'ed between mx nodes.
|
||||||
|
*/
|
||||||
|
if (ShouldSyncTableMetadata(relationId))
|
||||||
|
{
|
||||||
|
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||||
|
StringInfo lockRelationCommand = makeStringInfo();
|
||||||
|
ListCell *workerNodeCell = NULL;
|
||||||
|
|
||||||
|
appendStringInfo(lockRelationCommand, LOCK_RELATION_IF_EXISTS,
|
||||||
|
qualifiedRelationName, lockModeText);
|
||||||
|
|
||||||
|
foreach(workerNodeCell, workerNodeList)
|
||||||
|
{
|
||||||
|
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||||
|
char *nodeName = workerNode->workerName;
|
||||||
|
int nodePort = workerNode->workerPort;
|
||||||
|
|
||||||
|
/* if local node is one of the targets, acquire the lock locally */
|
||||||
|
if (workerNode->groupId == GetLocalGroupId())
|
||||||
|
{
|
||||||
|
LockRelationOid(relationId, lockMode);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
|
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
|
||||||
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
|
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
|
||||||
|
|
|
@ -19,8 +19,7 @@
|
||||||
#include "libpq-fe.h"
|
#include "libpq-fe.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/xact.h"
|
|
||||||
#include "catalog/namespace.h"
|
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/event_trigger.h"
|
#include "commands/event_trigger.h"
|
||||||
|
@ -57,22 +56,14 @@
|
||||||
#include "utils/inval.h"
|
#include "utils/inval.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
|
||||||
#include "utils/varlena.h"
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');"
|
|
||||||
#define REMOTE_LOCK_MODE_FOR_TRUNCATE "ACCESS EXCLUSIVE"
|
|
||||||
|
|
||||||
|
|
||||||
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType
|
static List * ModifyMultipleShardsTaskList(Query *query, List *shardIntervalList, TaskType
|
||||||
taskType);
|
taskType);
|
||||||
static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command);
|
static bool ShouldExecuteTruncateStmtSequential(TruncateStmt *command);
|
||||||
static LOCKMODE LockModeTextToLockMode(const char *lockModeName);
|
|
||||||
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
PG_FUNCTION_INFO_V1(master_modify_multiple_shards);
|
||||||
PG_FUNCTION_INFO_V1(lock_relation_if_exists);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -212,26 +203,6 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
|
||||||
taskList =
|
taskList =
|
||||||
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
|
ModifyMultipleShardsTaskList(modifyQuery, prunedShardIntervalList, taskType);
|
||||||
|
|
||||||
/*
|
|
||||||
* We lock the relation we're TRUNCATING on the other worker nodes before
|
|
||||||
* executing the truncate commands on the shards. This is necessary to prevent
|
|
||||||
* distributed deadlocks where a concurrent operation on the same table (or a
|
|
||||||
* cascading table) is executed on the other nodes.
|
|
||||||
*
|
|
||||||
* Note that we should skip the current node to prevent a self-deadlock that's why
|
|
||||||
* we use OTHER_WORKERS tag.
|
|
||||||
*/
|
|
||||||
if (truncateOperation && ShouldSyncTableMetadata(relationId))
|
|
||||||
{
|
|
||||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
|
||||||
StringInfo lockRelation = makeStringInfo();
|
|
||||||
|
|
||||||
appendStringInfo(lockRelation, LOCK_RELATION_IF_EXISTS, qualifiedRelationName,
|
|
||||||
REMOTE_LOCK_MODE_FOR_TRUNCATE);
|
|
||||||
|
|
||||||
SendCommandToWorkers(OTHER_WORKERS, lockRelation->data);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
{
|
{
|
||||||
affectedTupleCount =
|
affectedTupleCount =
|
||||||
|
@ -317,105 +288,3 @@ ShouldExecuteTruncateStmtSequential(TruncateStmt *command)
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* lock_relation_if_exists gets a relation name and lock mode
|
|
||||||
* and returns true if the relation exists and can be locked with
|
|
||||||
* the given lock mode. If the relation doesn't exists, the function
|
|
||||||
* return false.
|
|
||||||
*
|
|
||||||
* The relation name should be qualified with the schema name.
|
|
||||||
*
|
|
||||||
* The function errors out of the lockmode isn't defined in the PostgreSQL's
|
|
||||||
* explicit locking table.
|
|
||||||
*/
|
|
||||||
Datum
|
|
||||||
lock_relation_if_exists(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
text *relationName = PG_GETARG_TEXT_P(0);
|
|
||||||
text *lockModeText = PG_GETARG_TEXT_P(1);
|
|
||||||
|
|
||||||
Oid relationId = InvalidOid;
|
|
||||||
char *lockModeCString = text_to_cstring(lockModeText);
|
|
||||||
List *relationNameList = NIL;
|
|
||||||
RangeVar *relation = NULL;
|
|
||||||
LOCKMODE lockMode = NoLock;
|
|
||||||
|
|
||||||
/* ensure that we're in a transaction block */
|
|
||||||
RequireTransactionBlock(true, "lock_relation_if_exists");
|
|
||||||
|
|
||||||
relationId = ResolveRelationId(relationName, true);
|
|
||||||
if (!OidIsValid(relationId))
|
|
||||||
{
|
|
||||||
PG_RETURN_BOOL(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* get the lock mode */
|
|
||||||
lockMode = LockModeTextToLockMode(lockModeCString);
|
|
||||||
|
|
||||||
/* resolve relationId from passed in schema and relation name */
|
|
||||||
relationNameList = textToQualifiedNameList(relationName);
|
|
||||||
relation = makeRangeVarFromNameList(relationNameList);
|
|
||||||
|
|
||||||
/* lock the relation with the lock mode */
|
|
||||||
RangeVarGetRelid(relation, lockMode, false);
|
|
||||||
|
|
||||||
PG_RETURN_BOOL(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE.
|
|
||||||
* The function errors out if the input lock mode isn't defined in the PostgreSQL's
|
|
||||||
* explicit locking table.
|
|
||||||
*/
|
|
||||||
static LOCKMODE
|
|
||||||
LockModeTextToLockMode(const char *lockModeName)
|
|
||||||
{
|
|
||||||
if (pg_strncasecmp("NoLock", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
/* there is no explict call for NoLock, but keeping it here for convinience */
|
|
||||||
return NoLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("ACCESS SHARE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return AccessShareLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("ROW SHARE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return RowShareLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return RowExclusiveLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("SHARE UPDATE EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return ShareUpdateExclusiveLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("SHARE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return ShareLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("SHARE ROW EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return ShareRowExclusiveLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return ExclusiveLock;
|
|
||||||
}
|
|
||||||
else if (pg_strncasecmp("ACCESS EXCLUSIVE", lockModeName, NAMEDATALEN) == 0)
|
|
||||||
{
|
|
||||||
return AccessExclusiveLock;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
|
||||||
errmsg("unknown lock mode: %s", lockModeName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
return NoLock;
|
|
||||||
}
|
|
||||||
|
|
|
@ -59,6 +59,11 @@ SortList(List *pointerList, int (*comparisonFunction)(const void *, const void *
|
||||||
|
|
||||||
pfree(array);
|
pfree(array);
|
||||||
|
|
||||||
|
if (sortedList != NIL)
|
||||||
|
{
|
||||||
|
sortedList->type = pointerList->type;
|
||||||
|
}
|
||||||
|
|
||||||
return sortedList;
|
return sortedList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,8 @@
|
||||||
#include "c.h"
|
#include "c.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "catalog/namespace.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
|
@ -31,7 +33,38 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "distributed/version_compat.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
#include "utils/varlena.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
/* static definition and declarations */
|
||||||
|
struct LockModeToStringType
|
||||||
|
{
|
||||||
|
LOCKMODE lockMode;
|
||||||
|
const char *name;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* list of lock mode mappings, number of items need to be kept in sync
|
||||||
|
* with lock_mode_to_string_map_count.
|
||||||
|
*/
|
||||||
|
static const struct LockModeToStringType lockmode_to_string_map[] = {
|
||||||
|
{ NoLock, "NoLock" },
|
||||||
|
{ AccessShareLock, "ACCESS SHARE" },
|
||||||
|
{ RowShareLock, "ROW SHARE" },
|
||||||
|
{ RowExclusiveLock, "ROW EXCLUSIVE" },
|
||||||
|
{ ShareUpdateExclusiveLock, "SHARE UPDATE EXCLUSIVE" },
|
||||||
|
{ ShareLock, "SHARE" },
|
||||||
|
{ ShareRowExclusiveLock, "SHARE ROW EXCLUSIVE" },
|
||||||
|
{ ExclusiveLock, "EXCLUSIVE" },
|
||||||
|
{ AccessExclusiveLock, "ACCESS EXCLUSIVE" }
|
||||||
|
};
|
||||||
|
static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map) /
|
||||||
|
sizeof(lockmode_to_string_map[0]);
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
|
@ -45,6 +78,7 @@ static bool IsFirstWorkerNode();
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(lock_shard_metadata);
|
PG_FUNCTION_INFO_V1(lock_shard_metadata);
|
||||||
PG_FUNCTION_INFO_V1(lock_shard_resources);
|
PG_FUNCTION_INFO_V1(lock_shard_resources);
|
||||||
|
PG_FUNCTION_INFO_V1(lock_relation_if_exists);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -618,3 +652,115 @@ LockPartitionRelations(Oid relationId, LOCKMODE lockMode)
|
||||||
LockRelationOid(partitionRelationId, lockMode);
|
LockRelationOid(partitionRelationId, lockMode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockModeTextToLockMode gets a lockMode name and returns its corresponding LOCKMODE.
|
||||||
|
* The function errors out if the input lock mode isn't defined in the PostgreSQL's
|
||||||
|
* explicit locking table.
|
||||||
|
*/
|
||||||
|
LOCKMODE
|
||||||
|
LockModeTextToLockMode(const char *lockModeName)
|
||||||
|
{
|
||||||
|
LOCKMODE lockMode = -1;
|
||||||
|
|
||||||
|
int lockIndex = 0;
|
||||||
|
for (lockIndex = 0; lockIndex < lock_mode_to_string_map_count; lockIndex++)
|
||||||
|
{
|
||||||
|
const struct LockModeToStringType *lockMap = lockmode_to_string_map + lockIndex;
|
||||||
|
if (pg_strncasecmp(lockMap->name, lockModeName, NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
lockMode = lockMap->lockMode;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we could not find the lock mode we are looking for */
|
||||||
|
if (lockMode == -1)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||||
|
errmsg("unknown lock mode: %s", lockModeName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return lockMode;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LockModeToLockModeText gets a lockMode enum and returns its corresponding text
|
||||||
|
* representation.
|
||||||
|
* The function errors out if the input lock mode isn't defined in the PostgreSQL's
|
||||||
|
* explicit locking table.
|
||||||
|
*/
|
||||||
|
const char *
|
||||||
|
LockModeToLockModeText(LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
const char *lockModeText = NULL;
|
||||||
|
|
||||||
|
int lockIndex = 0;
|
||||||
|
for (lockIndex = 0; lockIndex < lock_mode_to_string_map_count; lockIndex++)
|
||||||
|
{
|
||||||
|
const struct LockModeToStringType *lockMap = lockmode_to_string_map + lockIndex;
|
||||||
|
if (lockMode == lockMap->lockMode)
|
||||||
|
{
|
||||||
|
lockModeText = lockMap->name;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we could not find the lock mode we are looking for */
|
||||||
|
if (lockModeText == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR,
|
||||||
|
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||||
|
errmsg("unknown lock mode enum: %d", (int) lockMode)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return lockModeText;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* lock_relation_if_exists gets a relation name and lock mode
|
||||||
|
* and returns true if the relation exists and can be locked with
|
||||||
|
* the given lock mode. If the relation doesn't exists, the function
|
||||||
|
* return false.
|
||||||
|
*
|
||||||
|
* The relation name should be qualified with the schema name.
|
||||||
|
*
|
||||||
|
* The function errors out of the lockmode isn't defined in the PostgreSQL's
|
||||||
|
* explicit locking table.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
lock_relation_if_exists(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
text *relationName = PG_GETARG_TEXT_P(0);
|
||||||
|
text *lockModeText = PG_GETARG_TEXT_P(1);
|
||||||
|
Oid relationId = InvalidOid;
|
||||||
|
char *lockModeCString = text_to_cstring(lockModeText);
|
||||||
|
List *relationNameList = NIL;
|
||||||
|
RangeVar *relation = NULL;
|
||||||
|
LOCKMODE lockMode = NoLock;
|
||||||
|
|
||||||
|
/* ensure that we're in a transaction block */
|
||||||
|
RequireTransactionBlock(true, "lock_relation_if_exists");
|
||||||
|
|
||||||
|
relationId = ResolveRelationId(relationName, true);
|
||||||
|
if (!OidIsValid(relationId))
|
||||||
|
{
|
||||||
|
PG_RETURN_BOOL(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* get the lock mode */
|
||||||
|
lockMode = LockModeTextToLockMode(lockModeCString);
|
||||||
|
|
||||||
|
/* resolve relationId from passed in schema and relation name */
|
||||||
|
relationNameList = textToQualifiedNameList(relationName);
|
||||||
|
relation = makeRangeVarFromNameList(relationNameList);
|
||||||
|
|
||||||
|
/* lock the relation with the lock mode */
|
||||||
|
RangeVarGetRelid(relation, lockMode, false);
|
||||||
|
|
||||||
|
PG_RETURN_BOOL(true);
|
||||||
|
}
|
||||||
|
|
|
@ -97,4 +97,8 @@ extern void LockPartitionRelations(Oid relationId, LOCKMODE lockMode);
|
||||||
/* Lock parent table's colocated shard resource */
|
/* Lock parent table's colocated shard resource */
|
||||||
extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode);
|
extern void LockParentShardResourceIfPartition(uint64 shardId, LOCKMODE lockMode);
|
||||||
|
|
||||||
|
/* Lock mode translation between text and enum */
|
||||||
|
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);
|
||||||
|
extern const char * LockModeToLockModeText(LOCKMODE lockMode);
|
||||||
|
|
||||||
#endif /* RESOURCE_LOCK_H */
|
#endif /* RESOURCE_LOCK_H */
|
||||||
|
|
Loading…
Reference in New Issue