Add distributing lock command support

pull/5938/head
gledis69 2022-05-20 10:39:22 +03:00
parent 757ecba968
commit 4731630741
24 changed files with 2116 additions and 143 deletions

View File

@ -44,7 +44,6 @@
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement); static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command); static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command);
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement); static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
static List * TruncateTaskList(Oid relationId); static List * TruncateTaskList(Oid relationId);
@ -244,7 +243,13 @@ PreprocessTruncateStatement(TruncateStmt *truncateStatement)
ErrorIfUnsupportedTruncateStmt(truncateStatement); ErrorIfUnsupportedTruncateStmt(truncateStatement);
EnsurePartitionTableNotReplicatedForTruncate(truncateStatement); EnsurePartitionTableNotReplicatedForTruncate(truncateStatement);
ExecuteTruncateStmtSequentialIfNecessary(truncateStatement); ExecuteTruncateStmtSequentialIfNecessary(truncateStatement);
LockTruncatedRelationMetadataInWorkers(truncateStatement);
uint32 lockAcquiringMode = truncateStatement->behavior == DROP_CASCADE ?
DIST_LOCK_REFERENCING_TABLES :
DIST_LOCK_DEFAULT;
AcquireDistributedLockOnRelations(truncateStatement->relations, AccessExclusiveLock,
lockAcquiringMode);
} }
@ -341,69 +346,3 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
} }
} }
} }
/*
* LockTruncatedRelationMetadataInWorkers determines if distributed
* lock is necessary for truncated relations, and acquire locks.
*
* LockTruncatedRelationMetadataInWorkers handles distributed locking
* of truncated tables before standard utility takes over.
*
* Actual distributed truncation occurs inside truncate trigger.
*
* This is only for distributed serialization of truncate commands.
* The function assumes that there is no foreign key relation between
* non-distributed and distributed relations.
*/
static void
LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
{
/* nothing to do if there is no metadata at worker nodes */
if (!ClusterHasKnownMetadataWorkers() || !EnableMetadataSync)
{
return;
}
List *distributedRelationList = NIL;
/*
* this is used to enforce the lock order:
* [...TruncatedTables], [...TablesTruncatedFromCascadingOnTruncatedTables]
*/
List *referencingRelationIds = NIL;
RangeVar *rangeVar = NULL;
foreach_ptr(rangeVar, truncateStatement->relations)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
if (!IsCitusTable(relationId))
{
continue;
}
distributedRelationList = list_append_unique_oid(distributedRelationList,
relationId);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
Assert(cacheEntry != NULL);
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
Oid referencingRelationId = InvalidOid;
foreach_oid(referencingRelationId, referencingTableList)
{
referencingRelationIds = lappend_oid(referencingRelationIds,
referencingRelationId);
}
}
distributedRelationList = list_concat_unique_oid(distributedRelationList,
referencingRelationIds);
if (distributedRelationList != NIL)
{
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock);
}
}

View File

@ -163,7 +163,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
parsetree = pstmt->utilityStmt; parsetree = pstmt->utilityStmt;
if (IsA(parsetree, TransactionStmt) || if (IsA(parsetree, TransactionStmt) ||
IsA(parsetree, LockStmt) ||
IsA(parsetree, ListenStmt) || IsA(parsetree, ListenStmt) ||
IsA(parsetree, NotifyStmt) || IsA(parsetree, NotifyStmt) ||
IsA(parsetree, ExecuteStmt) || IsA(parsetree, ExecuteStmt) ||
@ -468,6 +467,18 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
PreprocessTruncateStatement((TruncateStmt *) parsetree); PreprocessTruncateStatement((TruncateStmt *) parsetree);
} }
if (IsA(parsetree, LockStmt))
{
/*
* PreprocessLockStatement might lock the relations locally if the
* node executing the command is in pg_dist_node. Even though the process
* utility will re-acquire the locks across the same relations if the node
* is in the metadata (in the pg_dist_node table) that should not be a problem,
* plus it ensures consistent locking order between the nodes.
*/
PreprocessLockStatement((LockStmt *) parsetree, context);
}
/* /*
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
* and distribute the partition if necessary. * and distribute the partition if necessary.

View File

@ -251,10 +251,7 @@ FilterNameListForDistributedViews(List *viewNamesList, bool missing_ok)
continue; continue;
} }
ObjectAddress viewAddress = { 0 }; if (IsViewDistributed(viewOid))
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
if (IsObjectDistributed(&viewAddress))
{ {
distributedViewNames = lappend(distributedViewNames, distributedViewNames = lappend(distributedViewNames,
possiblyQualifiedViewName); possiblyQualifiedViewName);
@ -427,6 +424,21 @@ AlterViewOwnerCommand(Oid viewOid)
} }
/*
* IsViewDistributed checks if a view is distributed
*/
bool
IsViewDistributed(Oid viewOid)
{
Assert(get_rel_relkind(viewOid) == RELKIND_VIEW);
ObjectAddress viewAddress = { 0 };
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
return IsObjectDistributed(&viewAddress);
}
/* /*
* PreprocessAlterViewStmt is invoked for alter view statements. * PreprocessAlterViewStmt is invoked for alter view statements.
*/ */

View File

@ -74,7 +74,7 @@
#include "distributed/shared_library_init.h" #include "distributed/shared_library_init.h"
#include "distributed/statistics_collection.h" #include "distributed/statistics_collection.h"
#include "distributed/subplan_execution.h" #include "distributed/subplan_execution.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h" #include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h" #include "distributed/transaction_recovery.h"
#include "distributed/utils/directory.h" #include "distributed/utils/directory.h"
@ -670,6 +670,26 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.allow_unsafe_locks_from_workers",
gettext_noop("Enables acquiring a distributed lock from a worker "
"when the coordinator is not in the metadata"),
gettext_noop("Set to false by default. If set to true, enables "
"acquiring a distributed lock from a worker "
"when the coordinator is not in the metadata. "
"This type of lock is unsafe because the worker will not be "
"able to lock the coordinator; the coordinator will be able to "
"intialize distributed operations on the resources locked "
"by the worker. This can lead to concurrent operations from the "
"coordinator and distributed deadlocks since the coordinator "
"and the workers would not acquire locks across the same nodes "
"in the same order."),
&EnableAcquiringUnsafeLockFromWorkers,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.check_available_space_before_move", "citus.check_available_space_before_move",
gettext_noop("When enabled will check free disk space before a shard move"), gettext_noop("When enabled will check free disk space before a shard move"),

View File

@ -245,3 +245,24 @@ GenerateListFromElement(void *listElement, int listLength)
return list; return list;
} }
/*
* list_filter_oid filters a list of oid-s based on a keepElement
* function
*/
List *
list_filter_oid(List *list, bool (*keepElement)(Oid element))
{
List *result = NIL;
Oid element = InvalidOid;
foreach_oid(element, list)
{
if (keepElement(element))
{
result = lappend_oid(result, element);
}
}
return result;
}

View File

@ -21,6 +21,7 @@
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
@ -39,12 +40,14 @@
#include "distributed/utils/array_type.h" #include "distributed/utils/array_type.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/worker_shard_visibility.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/varlena.h" #include "utils/varlena.h"
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, %s);" #define LOCK_RELATION_IF_EXISTS \
"SELECT pg_catalog.lock_relation_if_exists(%s, %s);"
/* static definition and declarations */ /* static definition and declarations */
struct LockModeToStringType struct LockModeToStringType
@ -71,6 +74,17 @@ static const struct LockModeToStringType lockmode_to_string_map[] = {
static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map) / static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map) /
sizeof(lockmode_to_string_map[0]); sizeof(lockmode_to_string_map[0]);
/*
* LockRelationRecord holds the oid of a relation to be locked
* and a boolean inh to determine whether its decendants
* should be locked as well
*/
typedef struct LockRelationRecord
{
Oid relationId;
bool inh;
} LockRelationRecord;
/* local function forward declarations */ /* local function forward declarations */
static LOCKMODE IntToLockMode(int mode); static LOCKMODE IntToLockMode(int mode);
@ -92,6 +106,8 @@ PG_FUNCTION_INFO_V1(lock_shard_metadata);
PG_FUNCTION_INFO_V1(lock_shard_resources); PG_FUNCTION_INFO_V1(lock_shard_resources);
PG_FUNCTION_INFO_V1(lock_relation_if_exists); PG_FUNCTION_INFO_V1(lock_relation_if_exists);
/* Config variable managed via guc.c */
bool EnableAcquiringUnsafeLockFromWorkers = false;
/* /*
* lock_shard_metadata allows the shard distribution metadata to be locked * lock_shard_metadata allows the shard distribution metadata to be locked
@ -1081,14 +1097,110 @@ EnsureCanAcquireLock(Oid relationId, LOCKMODE lockMode)
/* /*
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes * CreateLockTerminationString creates a string that can be appended to the
* for given list of relations ids. Worker node list is sorted so that the lock * end of a partial lock command to properly terminate the command
* is acquired in the same order regardless of which node it was run on. Notice
* that no lock is acquired on coordinator node if the coordinator is not added
* to the metadata.
*/ */
void static const char *
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) CreateLockTerminationString(const char *lockModeText, bool nowait)
{
StringInfo lockTerminationStringInfo = makeStringInfo();
appendStringInfo(lockTerminationStringInfo, nowait ? " IN %s MODE NOWAIT;\n" :
" IN %s MODE;\n", lockModeText);
return lockTerminationStringInfo->data;
}
/*
* FinishLockCommandIfNecessary appends the lock termination string if the lock command is partial.
* Sets the partialLockCommand flag to false
*/
static void
FinishLockCommandIfNecessary(StringInfo lockCommand, const char *lockTerminationString,
bool *partialLockCommand)
{
if (*partialLockCommand)
{
appendStringInfo(lockCommand, "%s", lockTerminationString);
}
*partialLockCommand = false;
}
/*
* LockRelationRecordListMember checks if a relation id is present in the
* LockRelationRecord list
*/
static bool
LockRelationRecordListMember(List *lockRelationRecordList, Oid relationId)
{
LockRelationRecord *record = NULL;
foreach_ptr(record, lockRelationRecordList)
{
if (record->relationId == relationId)
{
return true;
}
}
return false;
}
/*
* MakeLockRelationRecord makes a LockRelationRecord using the relation oid
* and the inh boolean while properly allocating the structure
*/
static LockRelationRecord *
MakeLockRelationRecord(Oid relationId, bool inh)
{
LockRelationRecord *lockRelationRecord = palloc(sizeof(LockRelationRecord));
lockRelationRecord->relationId = relationId;
lockRelationRecord->inh = inh;
return lockRelationRecord;
}
/*
* ConcatLockRelationRecordList concats a list of LockRelationRecord with
* another list of LockRelationRecord created from a list of relation oid-s
* which are not present in the first list and an inh bool which will be
* applied across all LockRelationRecords
*/
static List *
ConcatLockRelationRecordList(List *lockRelationRecordList, List *relationOidList, bool
inh)
{
List *constructedList = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationOidList)
{
if (!LockRelationRecordListMember(lockRelationRecordList, relationId))
{
LockRelationRecord *record = MakeLockRelationRecord(relationId, inh);
constructedList = lappend(constructedList, (void *) record);
}
}
return list_concat(lockRelationRecordList, constructedList);
}
/*
* AcquireDistributedLockOnRelations_Internal acquire a distributed lock on worker nodes
* for given list of relations ids. Worker node list is sorted so that the lock
* is acquired in the same order regardless of which node it was run on.
*
* A nowait flag is used to require the locks to be available immediately
* and if that is not the case, an error will be thrown
*
* Notice that no validation or filtering is done on the relationIds, that is the responsibility
* of this function's caller.
*/
static void
AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList,
LOCKMODE lockMode, bool nowait)
{ {
const char *lockModeText = LockModeToLockModeText(lockMode); const char *lockModeText = LockModeToLockModeText(lockMode);
@ -1104,22 +1216,15 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
*/ */
bool startedLockCommand = false; bool startedLockCommand = false;
/* create a lock termination string used to terminate a partial lock command */
const char *lockTerminationString = CreateLockTerminationString(lockModeText, nowait);
int lockedRelations = 0; int lockedRelations = 0;
Oid relationId = InvalidOid; LockRelationRecord *lockRelationRecord;
foreach_oid(relationId, relationIdList) foreach_ptr(lockRelationRecord, lockRelationRecordList)
{ {
/* Oid relationId = lockRelationRecord->relationId;
* we want to prevent under privileged users to trigger establishing connections, bool lockDescendants = lockRelationRecord->inh;
* that's why we have this additional check. Otherwise, we'd get the errors as
* soon as we execute the command on any node
*/
EnsureCanAcquireLock(relationId, lockMode);
if (!ShouldSyncTableMetadata(relationId))
{
continue;
}
char *qualifiedRelationName = generate_qualified_relation_name(relationId); char *qualifiedRelationName = generate_qualified_relation_name(relationId);
/* /*
@ -1128,12 +1233,18 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
*/ */
if (get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE) if (get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE)
{ {
/* finish the partial lock statement */ FinishLockCommandIfNecessary(lockRelationsCommand, lockTerminationString,
if (startedLockCommand) &startedLockCommand);
{
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText); /*
startedLockCommand = false; * The user should not be able to trigger this codepath
} * with nowait = true or lockDescendants = false since the
* only way to do that is calling LOCK * IN * MODE NOWAIT;
* and LOCK ONLY * IN * MODE; respectively but foreign tables
* cannot be locked with LOCK command as of pg14
*/
Assert(nowait == false);
Assert(lockDescendants == true);
/* use lock_relation_if_exits to lock foreign table */ /* use lock_relation_if_exits to lock foreign table */
appendStringInfo(lockRelationsCommand, LOCK_RELATION_IF_EXISTS, appendStringInfo(lockRelationsCommand, LOCK_RELATION_IF_EXISTS,
@ -1144,12 +1255,16 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
else if (startedLockCommand) else if (startedLockCommand)
{ {
/* append relation to partial lock statement */ /* append relation to partial lock statement */
appendStringInfo(lockRelationsCommand, ", %s", qualifiedRelationName); appendStringInfo(lockRelationsCommand, ",%s%s",
lockDescendants ? " " : " ONLY ",
qualifiedRelationName);
} }
else else
{ {
/* start a new partial lock statement */ /* start a new partial lock statement */
appendStringInfo(lockRelationsCommand, "LOCK %s", qualifiedRelationName); appendStringInfo(lockRelationsCommand, "LOCK%s%s",
lockDescendants ? " " : " ONLY ",
qualifiedRelationName);
startedLockCommand = true; startedLockCommand = true;
} }
@ -1161,14 +1276,11 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
return; return;
} }
if (startedLockCommand) FinishLockCommandIfNecessary(lockRelationsCommand, lockTerminationString,
{ &startedLockCommand);
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText);
}
appendStringInfo(lockRelationsCommand, ENABLE_DDL_PROPAGATION); appendStringInfo(lockRelationsCommand, ENABLE_DDL_PROPAGATION);
const char *lockCommand = lockRelationsCommand->data; char *lockCommand = lockRelationsCommand->data;
List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES, NoLock); List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES, NoLock);
@ -1181,6 +1293,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
int32 localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
const char *currentUser = CurrentUserName();
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
{ {
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
@ -1193,6 +1306,126 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
continue; continue;
} }
SendCommandToWorker(nodeName, nodePort, lockCommand); SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort,
currentUser, list_make1(
lockCommand));
} }
} }
/*
* AcquireDistributedLockOnRelations filters relations before passing them to
* AcquireDistributedLockOnRelations_Internal to acquire the locks.
*
* Only tables, views, and foreign tables can be locked with this function. Other relations
* will cause an error.
*
* Skips non distributed relations.
* configs parameter is used to configure what relation will be locked and if the lock
* should throw an error if it cannot be acquired immediately
*/
void
AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode, uint32 configs)
{
if (!ClusterHasKnownMetadataWorkers() || !EnableMetadataSync || !EnableDDLPropagation)
{
return;
}
/* used to store the relations that will be locked */
List *distributedRelationRecordsList = NIL;
bool nowait = (configs & DIST_LOCK_NOWAIT) > 0;
RangeVar *rangeVar = NULL;
foreach_ptr(rangeVar, relationList)
{
Oid relationId = RangeVarGetRelidExtended(rangeVar, AccessShareLock, nowait ?
RVR_NOWAIT : 0, NULL, NULL);
LockRelationRecord *lockRelationRecord = MakeLockRelationRecord(relationId,
rangeVar->inh);
/*
* Note that allowing the user to lock shards could lead to
* distributed deadlocks due to shards not being locked when
* a distributed table is locked.
* However, because citus.enable_manual_changes_to_shards
* is a guc which is not visible by default, whoever is using this
* guc will hopefully know what they're doing and avoid such scenarios.
*/
ErrorIfIllegallyChangingKnownShard(relationId);
/*
* we want to prevent under privileged users to trigger establishing connections,
* that's why we have this additional check. Otherwise, we'd get the errors as
* soon as we execute the command on any node
*/
EnsureCanAcquireLock(relationId, lockMode);
bool isView = get_rel_relkind(relationId) == RELKIND_VIEW;
if ((isView && !IsViewDistributed(relationId)) ||
(!isView && !ShouldSyncTableMetadata(relationId)))
{
continue;
}
if (!LockRelationRecordListMember(distributedRelationRecordsList, relationId))
{
distributedRelationRecordsList = lappend(distributedRelationRecordsList,
(void *) lockRelationRecord);
}
if ((configs & DIST_LOCK_REFERENCING_TABLES) > 0)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
Assert(cacheEntry != NULL);
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
/* remove the relations which should not be synced */
referencingTableList = list_filter_oid(referencingTableList,
&ShouldSyncTableMetadata);
distributedRelationRecordsList = ConcatLockRelationRecordList(
distributedRelationRecordsList, referencingTableList, true);
}
}
if (distributedRelationRecordsList != NIL)
{
if (!IsCoordinator() && !CoordinatorAddedAsWorkerNode() &&
!EnableAcquiringUnsafeLockFromWorkers)
{
ereport(ERROR,
(errmsg(
"Cannot acquire a distributed lock from a worker node since the "
"coordinator is not in the metadata."),
errhint(
"Either run this command on the coordinator or add the coordinator "
"in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);\n"
"Alternatively, though it is not recommended, you can allow this command by running: "
"SET citus.allow_unsafe_locks_from_workers TO 'on';")));
}
AcquireDistributedLockOnRelations_Internal(distributedRelationRecordsList,
lockMode, nowait);
}
}
/**
* PreprocessLockStatement makes sure that the lock is allowed
* before calling AcquireDistributedLockOnRelations on the locked tables/views
* with flags DIST_LOCK_VIEWS_RECUR and DIST_LOCK_NOWAIT if nowait is true for
* the lock statement
*/
void
PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context)
{
bool isTopLevel = context == PROCESS_UTILITY_TOPLEVEL;
RequireTransactionBlock(isTopLevel, "LOCK TABLE");
uint32 nowaitFlag = stmt->nowait ? DIST_LOCK_NOWAIT : 0;
AcquireDistributedLockOnRelations(stmt->relations, stmt->mode, nowaitFlag);
}

View File

@ -536,6 +536,7 @@ extern char * CreateViewDDLCommand(Oid viewOid);
extern char * AlterViewOwnerCommand(Oid viewOid); extern char * AlterViewOwnerCommand(Oid viewOid);
extern char * DeparseViewStmt(Node *node); extern char * DeparseViewStmt(Node *node);
extern char * DeparseDropViewStmt(Node *node); extern char * DeparseDropViewStmt(Node *node);
extern bool IsViewDistributed(Oid viewOid);
extern List * CreateViewDDLCommandsIdempotent(Oid viewOid); extern List * CreateViewDDLCommandsIdempotent(Oid viewOid);
extern List * PreprocessAlterViewStmt(Node *node, const char *queryString, extern List * PreprocessAlterViewStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);

View File

@ -176,5 +176,6 @@ extern List * ListTake(List *pointerList, int size);
extern void * safe_list_nth(const List *list, int index); extern void * safe_list_nth(const List *list, int index);
extern List * GeneratePositiveIntSequenceList(int upTo); extern List * GeneratePositiveIntSequenceList(int upTo);
extern List * GenerateListFromElement(void *listElement, int listLength); extern List * GenerateListFromElement(void *listElement, int listLength);
extern List * list_filter_oid(List *list, bool (*keepElement)(Oid element));
#endif /* CITUS_LISTUTILS_H */ #endif /* CITUS_LISTUTILS_H */

View File

@ -16,6 +16,7 @@
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "storage/lock.h" #include "storage/lock.h"
#include "tcop/utility.h"
/* /*
@ -109,6 +110,26 @@ typedef enum CitusOperations
(uint32) 0, \ (uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP) ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP)
/*
* DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations
*/
enum DistLockConfigs
{
/*
* lock citus tables
*/
DIST_LOCK_DEFAULT = 0,
/*
* lock tables that refer to locked citus tables with a foreign key
*/
DIST_LOCK_REFERENCING_TABLES = 1,
/*
* throw an error if the lock is not immediately available
*/
DIST_LOCK_NOWAIT = 2
};
/* Lock shard/relation metadata for safe modifications */ /* Lock shard/relation metadata for safe modifications */
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode); extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
@ -117,7 +138,6 @@ extern bool TryLockPlacementCleanup(void);
extern void EnsureShardOwner(uint64 shardId, bool missingOk); extern void EnsureShardOwner(uint64 shardId, bool missingOk);
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList); extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
extern void BlockWritesToShardList(List *shardList); extern void BlockWritesToShardList(List *shardList);
extern void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
/* Lock shard/relation metadata of the referenced reference table if exists */ /* Lock shard/relation metadata of the referenced reference table if exists */
extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId, extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId,
@ -152,5 +172,10 @@ extern void LockParentShardResourceIfPartition(List *shardIntervalList,
/* Lock mode translation between text and enum */ /* Lock mode translation between text and enum */
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName); extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);
extern const char * LockModeToLockModeText(LOCKMODE lockMode); extern const char * LockModeToLockModeText(LOCKMODE lockMode);
extern void AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode,
uint32 configs);
extern void PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context);
extern bool EnableAcquiringUnsafeLockFromWorkers;
#endif /* RESOURCE_LOCK_H */ #endif /* RESOURCE_LOCK_H */

View File

@ -214,10 +214,10 @@ def save_regression_diff(name, output_dir):
shutil.move(path, new_file_path) shutil.move(path, new_file_path)
def sync_metadata_to_workers(pg_path, worker_ports, coordinator_port): def stop_metadata_to_workers(pg_path, worker_ports, coordinator_port):
for port in worker_ports: for port in worker_ports:
command = ( command = (
"SELECT * from start_metadata_sync_to_node('localhost', {port});".format( "SELECT * from stop_metadata_sync_to_node('localhost', {port});".format(
port=port port=port
) )
) )
@ -286,8 +286,8 @@ def initialize_citus_cluster(bindir, datadir, settings, config):
start_databases(bindir, datadir, config.node_name_to_ports, config.name, config.env_variables) start_databases(bindir, datadir, config.node_name_to_ports, config.name, config.env_variables)
create_citus_extension(bindir, config.node_name_to_ports.values()) create_citus_extension(bindir, config.node_name_to_ports.values())
add_workers(bindir, config.worker_ports, config.coordinator_port()) add_workers(bindir, config.worker_ports, config.coordinator_port())
if config.is_mx: if not config.is_mx:
sync_metadata_to_workers(bindir, config.worker_ports, config.coordinator_port()) stop_metadata_to_workers(bindir, config.worker_ports, config.coordinator_port())
if config.add_coordinator_to_metadata: if config.add_coordinator_to_metadata:
add_coordinator_to_metadata(bindir, config.coordinator_port()) add_coordinator_to_metadata(bindir, config.coordinator_port())
config.setup_steps() config.setup_steps()

View File

@ -176,7 +176,6 @@ class CitusUpgradeConfig(CitusBaseClusterConfig):
self.user = SUPER_USER_NAME self.user = SUPER_USER_NAME
self.mixed_mode = arguments["--mixed"] self.mixed_mode = arguments["--mixed"]
self.fixed_port = 57635 self.fixed_port = 57635
self.is_mx = False
class PostgresConfig(CitusDefaultClusterConfig): class PostgresConfig(CitusDefaultClusterConfig):
@ -341,6 +340,9 @@ class CitusNonMxClusterConfig(CitusDefaultClusterConfig):
def __init__(self, arguments): def __init__(self, arguments):
super().__init__(arguments) super().__init__(arguments)
self.is_mx = False self.is_mx = False
# citus does not support distributed functions
# when metadata is not synced
self.skip_tests = ["function_create", "functions"]
class PGUpgradeConfig(CitusBaseClusterConfig): class PGUpgradeConfig(CitusBaseClusterConfig):

View File

@ -0,0 +1,155 @@
CREATE SCHEMA distribute_lock_tests;
SET search_path TO distribute_lock_tests;
SET citus.next_shard_id TO 10000;
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
-- Test acquiring lock outside transaction
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ERROR: LOCK TABLE can only be used in transaction blocks
-- Test acquiring lock inside procedure
DO $$
BEGIN
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
END;
$$;
-- Try all valid lock options; also try omitting the optional TABLE keyword.
BEGIN TRANSACTION;
LOCK TABLE dist_table IN ACCESS SHARE MODE;
LOCK dist_table IN ROW SHARE MODE;
LOCK TABLE dist_table IN ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE UPDATE EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE MODE;
LOCK dist_table IN SHARE ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN EXCLUSIVE MODE;
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test that when the user does not have the required permissions to lock
-- the locks are not forwarded to the workers
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
RESET citus.enable_ddl_propagation;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,SET)
(localhost,57638,t,SET)
(2 rows)
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO read_only_user;
GRANT SELECT ON dist_table TO read_only_user;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE read_only_user;
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ERROR: permission denied for table dist_table
ROLLBACK;
SET citus.log_remote_commands TO OFF;
RESET ROLE;
-- test that user with view permissions can lock the tables
-- which the view is built on
CREATE VIEW myview AS SELECT * FROM dist_table;
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON distribute_lock_tests.myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,SET)
(localhost,57638,t,SET)
(2 rows)
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE TO user_with_view_permissions;
BEGIN;
LOCK myview IN ACCESS EXCLUSIVE MODE;
SELECT run_command_on_workers($$
SELECT mode FROM pg_locks WHERE relation = 'distribute_lock_tests.dist_table'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,AccessExclusiveLock)
(localhost,57638,t,AccessExclusiveLock)
(2 rows)
ROLLBACK;
RESET ROLE;
\c - - - :worker_1_port
SET search_path TO distribute_lock_tests;
-- Test trying to lock from a worker when the coordinator is not in the metadata
SET citus.allow_unsafe_locks_from_workers TO 'off';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ERROR: Cannot acquire a distributed lock from a worker node since the coordinator is not in the metadata.
HINT: Either run this command on the coordinator or add the coordinator in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);
Alternatively, though it is not recommended, you can allow this command by running: SET citus.allow_unsafe_locks_from_workers TO 'on';
ROLLBACK;
-- Verify that the same restriction does not apply to worker local tables
CREATE TABLE local_table(a int);
-- Verify that no locks will be distributed for the local lock
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK local_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.log_remote_commands;
-- Cleanup local table
DROP TABLE local_table;
-- Test that setting the guc to 'on' will allow the lock from workers
SET citus.allow_unsafe_locks_from_workers TO 'on';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test locking a shard
SET citus.enable_manual_changes_to_shards TO OFF;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ERROR: cannot modify "dist_table_10000" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
ROLLBACK;
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
SET citus.enable_manual_changes_to_shards TO ON;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.enable_manual_changes_to_shards;
\c - - - :master_port
DROP SCHEMA distribute_lock_tests CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table distribute_lock_tests.dist_table
drop cascades to view distribute_lock_tests.myview
SET citus.enable_ddl_propagation TO OFF;
DROP ROLE read_only_user;
DROP ROLE user_with_view_permissions;
RESET citus.enable_ddl_propagation;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
DROP USER read_only_user;
DROP USER user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,SET)
(localhost,57638,t,SET)
(2 rows)

File diff suppressed because it is too large Load Diff

View File

@ -129,8 +129,8 @@ step s2-view-worker:
query |state |wait_event_type|wait_event|usename |datname query |state |wait_event_type|wait_event|usename |datname
--------------------------------------------------------------------- ---------------------------------------------------------------------
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression UPDATE public.ref_table_1500877 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression UPDATE public.ref_table_1500877 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
(2 rows) (2 rows)
step s2-end: step s2-end:

View File

@ -18,7 +18,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s1-truncate: step s1-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
run_commands_on_session_level_connection_to_node run_commands_on_session_level_connection_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -42,7 +42,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s2-truncate: step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...> <waiting ...>
step s1-commit-worker: step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -138,7 +138,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s2-truncate: step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...> <waiting ...>
step s1-commit-worker: step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -210,7 +210,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s1-insert-select: step s1-insert-select:
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM truncate_table'); SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM data_table');
run_commands_on_session_level_connection_to_node run_commands_on_session_level_connection_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -234,7 +234,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s2-truncate: step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...> <waiting ...>
step s1-commit-worker: step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -330,7 +330,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s2-truncate: step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...> <waiting ...>
step s1-commit-worker: step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -402,7 +402,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s1-copy: step s1-copy:
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 5, 50 && echo 9, 90 && echo 10, 100''WITH CSV'); SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 6, 60 && echo 9, 90 && echo 10, 100''WITH CSV');
run_commands_on_session_level_connection_to_node run_commands_on_session_level_connection_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -426,7 +426,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s2-truncate: step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...> <waiting ...>
step s1-commit-worker: step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -504,7 +504,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s2-truncate: step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...> <waiting ...>
step s1-commit: step s1-commit:
COMMIT; COMMIT;
@ -587,7 +587,7 @@ run_commands_on_session_level_connection_to_node
(1 row) (1 row)
step s2-truncate: step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...> <waiting ...>
step s1-commit-worker: step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT'); SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -640,3 +640,59 @@ restore_isolation_tester_func
(1 row) (1 row)
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-truncate s3-select-count-from-ref-table s1-commit-worker s1-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s3-select-count-from-ref-table:
SELECT COUNT(*) FROM referencing_table_2;
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s3-select-count-from-ref-table: <... completed>
count
---------------------------------------------------------------------
0
(1 row)
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -251,19 +251,13 @@ step s2-truncate:
step s1-commit: step s1-commit:
COMMIT; COMMIT;
s2: WARNING: relation "public.dist_table" does not exist
s2: WARNING: relation "public.dist_table" does not exist
step s2-truncate: <... completed> step s2-truncate: <... completed>
ERROR: failure on connection marked as essential: localhost:xxxxx
step s2-select: step s2-select:
SELECT * FROM dist_table ORDER BY 1, 2; SELECT * FROM dist_table ORDER BY 1, 2;
a|b a|b
--------------------------------------------------------------------- ---------------------------------------------------------------------
1|2 (0 rows)
3|4
5|6
(3 rows)
restore_isolation_tester_func restore_isolation_tester_func
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -73,6 +73,7 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name
-- make sure that pg_class queries do not get blocked on table locks -- make sure that pg_class queries do not get blocked on table locks
begin; begin;
SET LOCAL citus.enable_ddl_propagation TO OFF;
lock table test_table in access exclusive mode; lock table test_table in access exclusive mode;
prepare transaction 'take-aggressive-lock'; prepare transaction 'take-aggressive-lock';
-- shards are hidden when using psql as application_name -- shards are hidden when using psql as application_name

View File

@ -94,5 +94,6 @@ test: isolation_metadata_sync_deadlock
test: isolation_replicated_dist_on_mx test: isolation_replicated_dist_on_mx
test: isolation_replicate_reference_tables_to_coordinator test: isolation_replicate_reference_tables_to_coordinator
test: isolation_multiuser_locking test: isolation_multiuser_locking
test: isolation_acquire_distributed_locks
test: isolation_check_mx test: isolation_check_mx

View File

@ -61,6 +61,7 @@ test: locally_execute_intermediate_results
test: multi_mx_alter_distributed_table test: multi_mx_alter_distributed_table
test: update_colocation_mx test: update_colocation_mx
test: resync_metadata_with_sequences test: resync_metadata_with_sequences
test: distributed_locks
# should be executed sequentially because it modifies metadata # should be executed sequentially because it modifies metadata
test: local_shard_execution_dropped_column test: local_shard_execution_dropped_column

View File

@ -464,6 +464,7 @@ push(@pgOptions, "citus.shard_replication_factor=2");
push(@pgOptions, "citus.node_connection_timeout=${connectionTimeout}"); push(@pgOptions, "citus.node_connection_timeout=${connectionTimeout}");
push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
# Some tests look at shards in pg_class, make sure we can usually see them: # Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");

View File

@ -0,0 +1,242 @@
#include "isolation_mx_common.include.spec"
setup
{
SELECT citus_set_coordinator_host('localhost', 57636);
CREATE TABLE dist_table(a int);
CREATE TABLE citus_local_table(a int);
CREATE TABLE local_table(a int);
CREATE TABLE ref_table(a int);
CREATE TABLE partitioned_table(a int)
PARTITION BY RANGE(a);
CREATE TABLE partition_1 PARTITION OF partitioned_table
FOR VALUES FROM (1) TO (11);
CREATE TABLE partition_2 PARTITION OF partitioned_table
FOR VALUES FROM (11) TO (21);
SELECT create_distributed_table('dist_table', 'a');
SELECT create_reference_table('ref_table');
SELECT citus_add_local_table_to_metadata('citus_local_table');
SELECT create_distributed_table('partitioned_table', 'a');
CREATE VIEW sub_view(a) AS
SELECT 2 * a AS a
FROM ref_table;
CREATE VIEW main_view AS
SELECT t1.a a1, t2.a a2, t3.a a3
FROM dist_table t1
JOIN citus_local_table t2 ON t1.a = t2.a
JOIN sub_view t3 ON t2.a = t3.a;
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO citus_local_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO local_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO ref_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO partitioned_table SELECT n FROM generate_series(8, 12) n;
}
teardown
{
DROP VIEW main_view;
DROP VIEW sub_view;
DROP TABLE dist_table;
DROP TABLE citus_local_table;
DROP TABLE local_table;
DROP TABLE ref_table;
DROP TABLE partitioned_table;
SELECT citus_remove_node('localhost', 57636);
SELECT citus_internal.restore_isolation_tester_func();
}
// coordinator session
session "coor"
step "coor-begin"
{
BEGIN;
}
step "coor-acquire-aggresive-lock-on-dist-table"
{
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-dist-table-nowait"
{
LOCK dist_table IN ACCESS EXCLUSIVE MODE NOWAIT;
}
step "coor-acquire-weak-lock-on-dist-table"
{
LOCK dist_table IN ACCESS SHARE MODE;
}
step "coor-acquire-aggresive-lock-on-view"
{
LOCK main_view IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-only-view"
{
LOCK ONLY main_view IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-view-nowait"
{
LOCK main_view IN ACCESS EXCLUSIVE MODE NOWAIT;
}
step "coor-lock-all"
{
LOCK dist_table, citus_local_table, ref_table, main_view, sub_view, local_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-read-dist-table"
{
SELECT COUNT(*) FROM dist_table;
}
step "coor-read-ref-table"
{
SELECT COUNT(*) FROM ref_table;
}
step "coor-acquire-aggresive-lock-on-partitioned-table"
{
LOCK partitioned_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-partitioned-table-with-*-syntax"
{
LOCK partitioned_table * IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-only-partitioned-table"
{
LOCK ONLY partitioned_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-ref-table"
{
LOCK ref_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-rollback"
{
ROLLBACK;
}
// worker 1 xact session
session "w1"
step "w1-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57637);
}
step "w1-begin"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "w1-read-dist-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM dist_table');
}
step "w1-read-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM ref_table');
}
step "w1-read-citus-local-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM citus_local_table');
}
step "w1-acquire-aggressive-lock-dist-table" {
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
}
step "w1-lock-reference-table"
{
SELECT run_commands_on_session_level_connection_to_node('LOCK ref_table IN ACCESS EXCLUSIVE MODE');
}
step "w1-read-partitioned-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM partitioned_table');
}
step "w1-read-partition-of-partitioned-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM partition_1');
}
step "w1-read-main-view"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM main_view');
}
step "w1-rollback"
{
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
}
step "w1-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
// worker 2 xact session
session "w2"
step "w2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57638);
}
step "w2-begin"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "w2-acquire-aggressive-lock-dist-table" {
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
}
step "w2-rollback"
{
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
}
step "w2-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
permutation "coor-begin" "coor-acquire-aggresive-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "coor-read-dist-table" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-begin" "coor-acquire-aggresive-lock-on-dist-table-nowait" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w2-start-session-level-connection" "w2-begin" "w1-acquire-aggressive-lock-dist-table" "w2-acquire-aggressive-lock-dist-table" "w1-rollback" "w1-read-dist-table" "w2-rollback" "w1-stop-connection" "w2-stop-connection"
permutation "coor-begin" "coor-acquire-weak-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w1-lock-reference-table" "coor-begin" "coor-read-ref-table" "w1-rollback" "coor-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-read-ref-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-view" "w1-start-session-level-connection" "w1-begin" "w1-read-ref-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-begin" "coor-acquire-aggresive-lock-on-view-nowait" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-lock-all" "w1-start-session-level-connection" "w1-begin" "w1-read-citus-local-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table-with-*-syntax" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-ref-table" "w1-start-session-level-connection" "w1-begin" "w1-read-main-view" "coor-rollback" "w1-rollback" "w1-stop-connection"

View File

@ -2,15 +2,27 @@
setup setup
{ {
CREATE TABLE truncate_table(id integer, value integer); CREATE TABLE truncate_table(id integer, value integer, PRIMARY KEY(id));
CREATE TABLE data_table(id integer, value integer);
CREATE TABLE referencing_table_1 (id integer, PRIMARY KEY(id), FOREIGN KEY (id) REFERENCES truncate_table(id));
CREATE TABLE referencing_table_2 (id integer, PRIMARY KEY(id), FOREIGN KEY (id) REFERENCES referencing_table_1(id));
SELECT create_distributed_table('truncate_table', 'id'); SELECT create_distributed_table('truncate_table', 'id');
SELECT create_distributed_table('data_table', 'id');
SELECT create_distributed_table('referencing_table_1', 'id');
SELECT create_distributed_table('referencing_table_2', 'id');
COPY truncate_table FROM PROGRAM 'echo 1, 10 && echo 2, 20 && echo 3, 30 && echo 4, 40 && echo 5, 50' WITH CSV; COPY truncate_table FROM PROGRAM 'echo 1, 10 && echo 2, 20 && echo 3, 30 && echo 4, 40 && echo 5, 50' WITH CSV;
COPY data_table FROM PROGRAM 'echo 20, 20 && echo 30, 30 && echo 40, 40 && echo 50, 50' WITH CSV;
} }
// Create and use UDF to close the connection opened in the setup step. Also return the cluster // Create and use UDF to close the connection opened in the setup step. Also return the cluster
// back to the initial state. // back to the initial state.
teardown teardown
{ {
DROP TABLE IF EXISTS data_table;
DROP TABLE IF EXISTS referencing_table_2;
DROP TABLE IF EXISTS referencing_table_1;
DROP TABLE IF EXISTS truncate_table CASCADE; DROP TABLE IF EXISTS truncate_table CASCADE;
SELECT citus_internal.restore_isolation_tester_func(); SELECT citus_internal.restore_isolation_tester_func();
} }
@ -36,7 +48,7 @@ step "s1-begin-on-worker"
step "s1-truncate" step "s1-truncate"
{ {
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
} }
step "s1-select" step "s1-select"
@ -46,7 +58,7 @@ step "s1-select"
step "s1-insert-select" step "s1-insert-select"
{ {
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM truncate_table'); SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM data_table');
} }
step "s1-delete" step "s1-delete"
@ -56,7 +68,7 @@ step "s1-delete"
step "s1-copy" step "s1-copy"
{ {
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 5, 50 && echo 9, 90 && echo 10, 100''WITH CSV'); SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 6, 60 && echo 9, 90 && echo 10, 100''WITH CSV');
} }
step "s1-alter" step "s1-alter"
@ -101,7 +113,7 @@ step "s2-begin-on-worker"
step "s2-truncate" step "s2-truncate"
{ {
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table'); SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
} }
step "s2-commit-worker" step "s2-commit-worker"
@ -122,6 +134,11 @@ step "s3-select-count"
SELECT COUNT(*) FROM truncate_table; SELECT COUNT(*) FROM truncate_table;
} }
step "s3-select-count-from-ref-table"
{
SELECT COUNT(*) FROM referencing_table_2;
}
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-truncate" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-truncate" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
@ -131,3 +148,4 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
permutation "s1-begin" "s1-alter" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit" "s2-commit-worker" "s2-stop-connection" "s3-select-count" permutation "s1-begin" "s1-alter" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit" "s2-commit-worker" "s2-stop-connection" "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count" permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-truncate" "s3-select-count-from-ref-table" "s1-commit-worker" "s1-stop-connection"

View File

@ -0,0 +1,149 @@
CREATE SCHEMA distribute_lock_tests;
SET search_path TO distribute_lock_tests;
SET citus.next_shard_id TO 10000;
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
-- Test acquiring lock outside transaction
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
-- Test acquiring lock inside procedure
DO $$
BEGIN
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
END;
$$;
-- Try all valid lock options; also try omitting the optional TABLE keyword.
BEGIN TRANSACTION;
LOCK TABLE dist_table IN ACCESS SHARE MODE;
LOCK dist_table IN ROW SHARE MODE;
LOCK TABLE dist_table IN ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE UPDATE EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE MODE;
LOCK dist_table IN SHARE ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN EXCLUSIVE MODE;
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test that when the user does not have the required permissions to lock
-- the locks are not forwarded to the workers
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
RESET citus.enable_ddl_propagation;
$$);
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO read_only_user;
GRANT SELECT ON dist_table TO read_only_user;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE read_only_user;
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
SET citus.log_remote_commands TO OFF;
RESET ROLE;
-- test that user with view permissions can lock the tables
-- which the view is built on
CREATE VIEW myview AS SELECT * FROM dist_table;
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON distribute_lock_tests.myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE TO user_with_view_permissions;
BEGIN;
LOCK myview IN ACCESS EXCLUSIVE MODE;
SELECT run_command_on_workers($$
SELECT mode FROM pg_locks WHERE relation = 'distribute_lock_tests.dist_table'::regclass;
$$);
ROLLBACK;
RESET ROLE;
\c - - - :worker_1_port
SET search_path TO distribute_lock_tests;
-- Test trying to lock from a worker when the coordinator is not in the metadata
SET citus.allow_unsafe_locks_from_workers TO 'off';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Verify that the same restriction does not apply to worker local tables
CREATE TABLE local_table(a int);
-- Verify that no locks will be distributed for the local lock
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK local_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.log_remote_commands;
-- Cleanup local table
DROP TABLE local_table;
-- Test that setting the guc to 'on' will allow the lock from workers
SET citus.allow_unsafe_locks_from_workers TO 'on';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test locking a shard
SET citus.enable_manual_changes_to_shards TO OFF;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
SET citus.enable_manual_changes_to_shards TO ON;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.enable_manual_changes_to_shards;
\c - - - :master_port
DROP SCHEMA distribute_lock_tests CASCADE;
SET citus.enable_ddl_propagation TO OFF;
DROP ROLE read_only_user;
DROP ROLE user_with_view_permissions;
RESET citus.enable_ddl_propagation;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
DROP USER read_only_user;
DROP USER user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);

View File

@ -43,7 +43,9 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name
-- make sure that pg_class queries do not get blocked on table locks -- make sure that pg_class queries do not get blocked on table locks
begin; begin;
SET LOCAL citus.enable_ddl_propagation TO OFF;
lock table test_table in access exclusive mode; lock table test_table in access exclusive mode;
prepare transaction 'take-aggressive-lock'; prepare transaction 'take-aggressive-lock';
-- shards are hidden when using psql as application_name -- shards are hidden when using psql as application_name