mirror of https://github.com/citusdata/citus.git
Naive lock propagation
parent
b092a1a496
commit
9312eda040
|
@ -39,16 +39,11 @@
|
||||||
#include "utils/regproc.h"
|
#include "utils/regproc.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, '%s');"
|
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations for unsupported command checks */
|
/* Local functions forward declarations for unsupported command checks */
|
||||||
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
||||||
static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command);
|
static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command);
|
||||||
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
|
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
|
||||||
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
|
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
|
||||||
static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
|
|
||||||
static List * TruncateTaskList(Oid relationId);
|
static List * TruncateTaskList(Oid relationId);
|
||||||
|
|
||||||
|
|
||||||
|
@ -402,74 +397,8 @@ LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
|
||||||
|
|
||||||
if (distributedRelationList != NIL)
|
if (distributedRelationList != NIL)
|
||||||
{
|
{
|
||||||
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock);
|
bool nowait = false;
|
||||||
}
|
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock,
|
||||||
}
|
nowait);
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
|
|
||||||
* for given list of relations ids. Relation id list and worker node list
|
|
||||||
* sorted so that the lock is acquired in the same order regardless of which
|
|
||||||
* node it was run on. Notice that no lock is acquired on coordinator node.
|
|
||||||
*
|
|
||||||
* Notice that the locking functions is sent to all workers regardless of if
|
|
||||||
* it has metadata or not. This is because a worker node only knows itself
|
|
||||||
* and previous workers that has metadata sync turned on. The node does not
|
|
||||||
* know about other nodes that have metadata sync turned on afterwards.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
|
||||||
{
|
|
||||||
Oid relationId = InvalidOid;
|
|
||||||
List *workerNodeList = ActivePrimaryNodeList(NoLock);
|
|
||||||
const char *lockModeText = LockModeToLockModeText(lockMode);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* We want to acquire locks in the same order across the nodes.
|
|
||||||
* Although relation ids may change, their ordering will not.
|
|
||||||
*/
|
|
||||||
relationIdList = SortList(relationIdList, CompareOids);
|
|
||||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
|
||||||
|
|
||||||
UseCoordinatedTransaction();
|
|
||||||
|
|
||||||
int32 localGroupId = GetLocalGroupId();
|
|
||||||
|
|
||||||
foreach_oid(relationId, relationIdList)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We only acquire distributed lock on relation if
|
|
||||||
* the relation is sync'ed between mx nodes.
|
|
||||||
*
|
|
||||||
* Even if users disable metadata sync, we cannot
|
|
||||||
* allow them not to acquire the remote locks.
|
|
||||||
* Hence, we have !IsCoordinator() check.
|
|
||||||
*/
|
|
||||||
if (ShouldSyncTableMetadata(relationId) || !IsCoordinator())
|
|
||||||
{
|
|
||||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
|
||||||
StringInfo lockRelationCommand = makeStringInfo();
|
|
||||||
|
|
||||||
appendStringInfo(lockRelationCommand, LOCK_RELATION_IF_EXISTS,
|
|
||||||
quote_literal_cstr(qualifiedRelationName),
|
|
||||||
lockModeText);
|
|
||||||
|
|
||||||
WorkerNode *workerNode = NULL;
|
|
||||||
foreach_ptr(workerNode, workerNodeList)
|
|
||||||
{
|
|
||||||
const char *nodeName = workerNode->workerName;
|
|
||||||
int nodePort = workerNode->workerPort;
|
|
||||||
|
|
||||||
/* if local node is one of the targets, acquire the lock locally */
|
|
||||||
if (workerNode->groupId == localGroupId)
|
|
||||||
{
|
|
||||||
LockRelationOid(relationId, lockMode);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,7 @@
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
|
||||||
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
||||||
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
|
int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_IMMEDIATE;
|
||||||
|
@ -163,7 +164,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
parsetree = pstmt->utilityStmt;
|
parsetree = pstmt->utilityStmt;
|
||||||
|
|
||||||
if (IsA(parsetree, TransactionStmt) ||
|
if (IsA(parsetree, TransactionStmt) ||
|
||||||
IsA(parsetree, LockStmt) ||
|
|
||||||
IsA(parsetree, ListenStmt) ||
|
IsA(parsetree, ListenStmt) ||
|
||||||
IsA(parsetree, NotifyStmt) ||
|
IsA(parsetree, NotifyStmt) ||
|
||||||
IsA(parsetree, ExecuteStmt) ||
|
IsA(parsetree, ExecuteStmt) ||
|
||||||
|
@ -468,6 +468,36 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
||||||
PreprocessTruncateStatement((TruncateStmt *) parsetree);
|
PreprocessTruncateStatement((TruncateStmt *) parsetree);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (IsA(parsetree, LockStmt))
|
||||||
|
{
|
||||||
|
LockStmt *stmt = (LockStmt *) parsetree;
|
||||||
|
List *distributedRelationList = NIL;
|
||||||
|
|
||||||
|
RangeVar *rangeVar = NULL;
|
||||||
|
foreach_ptr(rangeVar, stmt->relations)
|
||||||
|
{
|
||||||
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||||
|
|
||||||
|
if (!IsCitusTable(relationId))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (list_member_oid(distributedRelationList, relationId))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
distributedRelationList = lappend_oid(distributedRelationList, relationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (distributedRelationList != NIL)
|
||||||
|
{
|
||||||
|
AcquireDistributedLockOnRelations(distributedRelationList, stmt->mode,
|
||||||
|
stmt->nowait);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
|
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
|
||||||
* and distribute the partition if necessary.
|
* and distribute the partition if necessary.
|
||||||
|
|
|
@ -6,3 +6,14 @@ DROP FUNCTION pg_catalog.worker_hash_partition_table(bigint, integer, text, text
|
||||||
DROP FUNCTION pg_catalog.worker_merge_files_into_table(bigint, integer, text[], text[]);
|
DROP FUNCTION pg_catalog.worker_merge_files_into_table(bigint, integer, text[], text[]);
|
||||||
DROP FUNCTION pg_catalog.worker_range_partition_table(bigint, integer, text, text, oid, anyarray);
|
DROP FUNCTION pg_catalog.worker_range_partition_table(bigint, integer, text, text, oid, anyarray);
|
||||||
DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint);
|
DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint);
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION lock_relation_if_exists(table_name text, lock_mode text, nowait boolean)
|
||||||
|
RETURNS BOOL
|
||||||
|
LANGUAGE C STRICT as 'MODULE_PATHNAME',
|
||||||
|
$$lock_relation_if_exists$$;
|
||||||
|
COMMENT ON FUNCTION lock_relation_if_exists(table_name text, lock_mode text, nowait boolean)
|
||||||
|
IS 'locks relation in the lock_mode if the relation exists';
|
||||||
|
|
||||||
|
RESET search_path;
|
||||||
|
|
|
@ -975,6 +975,15 @@ lock_relation_if_exists(PG_FUNCTION_ARGS)
|
||||||
text *lockModeText = PG_GETARG_TEXT_P(1);
|
text *lockModeText = PG_GETARG_TEXT_P(1);
|
||||||
char *lockModeCString = text_to_cstring(lockModeText);
|
char *lockModeCString = text_to_cstring(lockModeText);
|
||||||
|
|
||||||
|
bool nowait = false;
|
||||||
|
if (!PG_ARGISNULL(2))
|
||||||
|
{
|
||||||
|
nowait = PG_GETARG_BOOL(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
ereport(NOTICE, errmsg("%s %s %s", text_to_cstring(relationName), text_to_cstring(
|
||||||
|
lockModeText), nowait ? "nowait" : "wait"));
|
||||||
|
|
||||||
/* ensure that we're in a transaction block */
|
/* ensure that we're in a transaction block */
|
||||||
RequireTransactionBlock(true, "lock_relation_if_exists");
|
RequireTransactionBlock(true, "lock_relation_if_exists");
|
||||||
|
|
||||||
|
@ -985,8 +994,11 @@ lock_relation_if_exists(PG_FUNCTION_ARGS)
|
||||||
List *relationNameList = textToQualifiedNameList(relationName);
|
List *relationNameList = textToQualifiedNameList(relationName);
|
||||||
RangeVar *relation = makeRangeVarFromNameList(relationNameList);
|
RangeVar *relation = makeRangeVarFromNameList(relationNameList);
|
||||||
|
|
||||||
|
uint32 nowaitFlag = nowait ? RVR_NOWAIT : 0;
|
||||||
|
|
||||||
/* lock the relation with the lock mode */
|
/* lock the relation with the lock mode */
|
||||||
Oid relationId = RangeVarGetRelidExtended(relation, lockMode, RVR_MISSING_OK,
|
Oid relationId = RangeVarGetRelidExtended(relation, lockMode, RVR_MISSING_OK |
|
||||||
|
nowaitFlag,
|
||||||
CitusRangeVarCallbackForLockTable,
|
CitusRangeVarCallbackForLockTable,
|
||||||
(void *) &lockMode);
|
(void *) &lockMode);
|
||||||
bool relationExists = OidIsValid(relationId);
|
bool relationExists = OidIsValid(relationId);
|
||||||
|
@ -1060,3 +1072,78 @@ CitusLockTableAclCheck(Oid relationId, LOCKMODE lockmode, Oid userId)
|
||||||
|
|
||||||
return aclResult;
|
return aclResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
|
||||||
|
* for given list of relations ids. Relation id list and worker node list
|
||||||
|
* sorted so that the lock is acquired in the same order regardless of which
|
||||||
|
* node it was run on. Notice that no lock is acquired on coordinator node.
|
||||||
|
*
|
||||||
|
* Notice that the locking functions is sent to all workers regardless of if
|
||||||
|
* it has metadata or not. This is because a worker node only knows itself
|
||||||
|
* and previous workers that has metadata sync turned on. The node does not
|
||||||
|
* know about other nodes that have metadata sync turned on afterwards.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode, bool nowait)
|
||||||
|
{
|
||||||
|
Oid relationId = InvalidOid;
|
||||||
|
List *workerNodeList = ActivePrimaryNodeList(NoLock);
|
||||||
|
|
||||||
|
const char *lockModeText = LockModeToLockModeText(lockMode);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We want to acquire locks in the same order across the nodes.
|
||||||
|
* Although relation ids may change, their ordering will not.
|
||||||
|
*/
|
||||||
|
relationIdList = SortList(relationIdList, CompareOids);
|
||||||
|
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||||
|
|
||||||
|
UseCoordinatedTransaction();
|
||||||
|
|
||||||
|
int32 localGroupId = GetLocalGroupId();
|
||||||
|
|
||||||
|
foreach_oid(relationId, relationIdList)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We only acquire distributed lock on relation if
|
||||||
|
* the relation is sync'ed between mx nodes.
|
||||||
|
*
|
||||||
|
* Even if users disable metadata sync, we cannot
|
||||||
|
* allow them not to acquire the remote locks.
|
||||||
|
* Hence, we have !IsCoordinator() check.
|
||||||
|
*/
|
||||||
|
if (ShouldSyncTableMetadata(relationId) || !IsCoordinator())
|
||||||
|
{
|
||||||
|
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||||
|
StringInfo lockRelationCommand = makeStringInfo();
|
||||||
|
|
||||||
|
char *lockCommand = nowait ? LOCK_RELATION_IF_EXISTS_NOWAIT :
|
||||||
|
LOCK_RELATION_IF_EXISTS;
|
||||||
|
appendStringInfo(lockRelationCommand, lockCommand,
|
||||||
|
quote_literal_cstr(qualifiedRelationName),
|
||||||
|
lockModeText);
|
||||||
|
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
|
{
|
||||||
|
const char *nodeName = workerNode->workerName;
|
||||||
|
int nodePort = workerNode->workerPort;
|
||||||
|
|
||||||
|
/* if local node is one of the targets, acquire the lock locally */
|
||||||
|
if (workerNode->groupId == localGroupId)
|
||||||
|
{
|
||||||
|
DirectFunctionCall3(
|
||||||
|
lock_relation_if_exists,
|
||||||
|
(Datum) cstring_to_text(qualifiedRelationName),
|
||||||
|
(Datum) cstring_to_text(lockModeText),
|
||||||
|
(Datum) nowait);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -110,6 +110,10 @@ typedef enum CitusOperations
|
||||||
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP)
|
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP)
|
||||||
|
|
||||||
|
|
||||||
|
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, '%s');"
|
||||||
|
#define LOCK_RELATION_IF_EXISTS_NOWAIT \
|
||||||
|
"SELECT lock_relation_if_exists(%s, '%s', nowait => true);"
|
||||||
|
|
||||||
/* Lock shard/relation metadata for safe modifications */
|
/* Lock shard/relation metadata for safe modifications */
|
||||||
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||||
extern void LockPlacementCleanup(void);
|
extern void LockPlacementCleanup(void);
|
||||||
|
@ -151,5 +155,6 @@ extern void LockParentShardResourceIfPartition(List *shardIntervalList,
|
||||||
/* Lock mode translation between text and enum */
|
/* Lock mode translation between text and enum */
|
||||||
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);
|
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);
|
||||||
extern const char * LockModeToLockModeText(LOCKMODE lockMode);
|
extern const char * LockModeToLockModeText(LOCKMODE lockMode);
|
||||||
|
extern void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode,
|
||||||
|
bool nowait);
|
||||||
#endif /* RESOURCE_LOCK_H */
|
#endif /* RESOURCE_LOCK_H */
|
||||||
|
|
Loading…
Reference in New Issue