mirror of https://github.com/citusdata/citus.git
Merge pull request #5938 from citusdata/propagate-lock
DESCRIPTION: * Lock statements will propagate locks to all the nodes in the metadata when a citus tables or a view is locked. Locks will also be propagated for all citus tables that appear in the definition of views that are locked (recursively applies to the views inside the locked view). * TRUNCATE-ing a citus table from a worker node is no longer allowed if the coordinator is not added to the metadata. When TRUNCATE is called on a citus table that table needs to be locked in all the nodes before the TRUCATE is executed. If the coordinator is not in the metadata, the coordinator will not be aware of the lock on the table and can access the table's shard while a TRUNCATE is executing. Despite not being recommended, this behavior can be allowed by setting `SET citus.allow_unsafe_locks_from_workers TO 'on';`. * `pg_dump` which calls LOCK TABLE is affected in the same way as TRUNCATE. An error is thrown when locking from worker when coordinator is not in the metadata.pull/5959/head
commit
b1d1df8214
|
@ -44,7 +44,6 @@
|
|||
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
||||
static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command);
|
||||
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
|
||||
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
|
||||
static List * TruncateTaskList(Oid relationId);
|
||||
|
||||
|
||||
|
@ -244,7 +243,13 @@ PreprocessTruncateStatement(TruncateStmt *truncateStatement)
|
|||
ErrorIfUnsupportedTruncateStmt(truncateStatement);
|
||||
EnsurePartitionTableNotReplicatedForTruncate(truncateStatement);
|
||||
ExecuteTruncateStmtSequentialIfNecessary(truncateStatement);
|
||||
LockTruncatedRelationMetadataInWorkers(truncateStatement);
|
||||
|
||||
uint32 lockAcquiringMode = truncateStatement->behavior == DROP_CASCADE ?
|
||||
DIST_LOCK_REFERENCING_TABLES :
|
||||
DIST_LOCK_DEFAULT;
|
||||
|
||||
AcquireDistributedLockOnRelations(truncateStatement->relations, AccessExclusiveLock,
|
||||
lockAcquiringMode);
|
||||
}
|
||||
|
||||
|
||||
|
@ -341,69 +346,3 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockTruncatedRelationMetadataInWorkers determines if distributed
|
||||
* lock is necessary for truncated relations, and acquire locks.
|
||||
*
|
||||
* LockTruncatedRelationMetadataInWorkers handles distributed locking
|
||||
* of truncated tables before standard utility takes over.
|
||||
*
|
||||
* Actual distributed truncation occurs inside truncate trigger.
|
||||
*
|
||||
* This is only for distributed serialization of truncate commands.
|
||||
* The function assumes that there is no foreign key relation between
|
||||
* non-distributed and distributed relations.
|
||||
*/
|
||||
static void
|
||||
LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
|
||||
{
|
||||
/* nothing to do if there is no metadata at worker nodes */
|
||||
if (!ClusterHasKnownMetadataWorkers() || !EnableMetadataSync)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
List *distributedRelationList = NIL;
|
||||
|
||||
/*
|
||||
* this is used to enforce the lock order:
|
||||
* [...TruncatedTables], [...TablesTruncatedFromCascadingOnTruncatedTables]
|
||||
*/
|
||||
List *referencingRelationIds = NIL;
|
||||
|
||||
RangeVar *rangeVar = NULL;
|
||||
foreach_ptr(rangeVar, truncateStatement->relations)
|
||||
{
|
||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
distributedRelationList = list_append_unique_oid(distributedRelationList,
|
||||
relationId);
|
||||
|
||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||
Assert(cacheEntry != NULL);
|
||||
|
||||
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
|
||||
|
||||
Oid referencingRelationId = InvalidOid;
|
||||
foreach_oid(referencingRelationId, referencingTableList)
|
||||
{
|
||||
referencingRelationIds = lappend_oid(referencingRelationIds,
|
||||
referencingRelationId);
|
||||
}
|
||||
}
|
||||
|
||||
distributedRelationList = list_concat_unique_oid(distributedRelationList,
|
||||
referencingRelationIds);
|
||||
|
||||
if (distributedRelationList != NIL)
|
||||
{
|
||||
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -163,7 +163,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
parsetree = pstmt->utilityStmt;
|
||||
|
||||
if (IsA(parsetree, TransactionStmt) ||
|
||||
IsA(parsetree, LockStmt) ||
|
||||
IsA(parsetree, ListenStmt) ||
|
||||
IsA(parsetree, NotifyStmt) ||
|
||||
IsA(parsetree, ExecuteStmt) ||
|
||||
|
@ -468,6 +467,18 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
PreprocessTruncateStatement((TruncateStmt *) parsetree);
|
||||
}
|
||||
|
||||
if (IsA(parsetree, LockStmt))
|
||||
{
|
||||
/*
|
||||
* PreprocessLockStatement might lock the relations locally if the
|
||||
* node executing the command is in pg_dist_node. Even though the process
|
||||
* utility will re-acquire the locks across the same relations if the node
|
||||
* is in the metadata (in the pg_dist_node table) that should not be a problem,
|
||||
* plus it ensures consistent locking order between the nodes.
|
||||
*/
|
||||
PreprocessLockStatement((LockStmt *) parsetree, context);
|
||||
}
|
||||
|
||||
/*
|
||||
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
|
||||
* and distribute the partition if necessary.
|
||||
|
|
|
@ -251,10 +251,7 @@ FilterNameListForDistributedViews(List *viewNamesList, bool missing_ok)
|
|||
continue;
|
||||
}
|
||||
|
||||
ObjectAddress viewAddress = { 0 };
|
||||
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
|
||||
|
||||
if (IsObjectDistributed(&viewAddress))
|
||||
if (IsViewDistributed(viewOid))
|
||||
{
|
||||
distributedViewNames = lappend(distributedViewNames,
|
||||
possiblyQualifiedViewName);
|
||||
|
@ -427,6 +424,21 @@ AlterViewOwnerCommand(Oid viewOid)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsViewDistributed checks if a view is distributed
|
||||
*/
|
||||
bool
|
||||
IsViewDistributed(Oid viewOid)
|
||||
{
|
||||
Assert(get_rel_relkind(viewOid) == RELKIND_VIEW);
|
||||
|
||||
ObjectAddress viewAddress = { 0 };
|
||||
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
|
||||
|
||||
return IsObjectDistributed(&viewAddress);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterViewStmt is invoked for alter view statements.
|
||||
*/
|
||||
|
|
|
@ -74,7 +74,7 @@
|
|||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/statistics_collection.h"
|
||||
#include "distributed/subplan_execution.h"
|
||||
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/transaction_recovery.h"
|
||||
#include "distributed/utils/directory.h"
|
||||
|
@ -670,6 +670,26 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.allow_unsafe_locks_from_workers",
|
||||
gettext_noop("Enables acquiring a distributed lock from a worker "
|
||||
"when the coordinator is not in the metadata"),
|
||||
gettext_noop("Set to false by default. If set to true, enables "
|
||||
"acquiring a distributed lock from a worker "
|
||||
"when the coordinator is not in the metadata. "
|
||||
"This type of lock is unsafe because the worker will not be "
|
||||
"able to lock the coordinator; the coordinator will be able to "
|
||||
"intialize distributed operations on the resources locked "
|
||||
"by the worker. This can lead to concurrent operations from the "
|
||||
"coordinator and distributed deadlocks since the coordinator "
|
||||
"and the workers would not acquire locks across the same nodes "
|
||||
"in the same order."),
|
||||
&EnableAcquiringUnsafeLockFromWorkers,
|
||||
false,
|
||||
PGC_USERSET,
|
||||
GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.check_available_space_before_move",
|
||||
gettext_noop("When enabled will check free disk space before a shard move"),
|
||||
|
|
|
@ -245,3 +245,24 @@ GenerateListFromElement(void *listElement, int listLength)
|
|||
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* list_filter_oid filters a list of oid-s based on a keepElement
|
||||
* function
|
||||
*/
|
||||
List *
|
||||
list_filter_oid(List *list, bool (*keepElement)(Oid element))
|
||||
{
|
||||
List *result = NIL;
|
||||
Oid element = InvalidOid;
|
||||
foreach_oid(element, list)
|
||||
{
|
||||
if (keepElement(element))
|
||||
{
|
||||
result = lappend_oid(result, element);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "catalog/namespace.h"
|
||||
#include "commands/tablecmds.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
|
@ -39,12 +40,14 @@
|
|||
#include "distributed/utils/array_type.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/varlena.h"
|
||||
|
||||
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, %s);"
|
||||
#define LOCK_RELATION_IF_EXISTS \
|
||||
"SELECT pg_catalog.lock_relation_if_exists(%s, %s);"
|
||||
|
||||
/* static definition and declarations */
|
||||
struct LockModeToStringType
|
||||
|
@ -71,6 +74,17 @@ static const struct LockModeToStringType lockmode_to_string_map[] = {
|
|||
static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map) /
|
||||
sizeof(lockmode_to_string_map[0]);
|
||||
|
||||
/*
|
||||
* LockRelationRecord holds the oid of a relation to be locked
|
||||
* and a boolean inh to determine whether its decendants
|
||||
* should be locked as well
|
||||
*/
|
||||
typedef struct LockRelationRecord
|
||||
{
|
||||
Oid relationId;
|
||||
bool inh;
|
||||
} LockRelationRecord;
|
||||
|
||||
|
||||
/* local function forward declarations */
|
||||
static LOCKMODE IntToLockMode(int mode);
|
||||
|
@ -92,6 +106,8 @@ PG_FUNCTION_INFO_V1(lock_shard_metadata);
|
|||
PG_FUNCTION_INFO_V1(lock_shard_resources);
|
||||
PG_FUNCTION_INFO_V1(lock_relation_if_exists);
|
||||
|
||||
/* Config variable managed via guc.c */
|
||||
bool EnableAcquiringUnsafeLockFromWorkers = false;
|
||||
|
||||
/*
|
||||
* lock_shard_metadata allows the shard distribution metadata to be locked
|
||||
|
@ -1081,14 +1097,110 @@ EnsureCanAcquireLock(Oid relationId, LOCKMODE lockMode)
|
|||
|
||||
|
||||
/*
|
||||
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
|
||||
* for given list of relations ids. Worker node list is sorted so that the lock
|
||||
* is acquired in the same order regardless of which node it was run on. Notice
|
||||
* that no lock is acquired on coordinator node if the coordinator is not added
|
||||
* to the metadata.
|
||||
* CreateLockTerminationString creates a string that can be appended to the
|
||||
* end of a partial lock command to properly terminate the command
|
||||
*/
|
||||
void
|
||||
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
||||
static const char *
|
||||
CreateLockTerminationString(const char *lockModeText, bool nowait)
|
||||
{
|
||||
StringInfo lockTerminationStringInfo = makeStringInfo();
|
||||
appendStringInfo(lockTerminationStringInfo, nowait ? " IN %s MODE NOWAIT;\n" :
|
||||
" IN %s MODE;\n", lockModeText);
|
||||
return lockTerminationStringInfo->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishLockCommandIfNecessary appends the lock termination string if the lock command is partial.
|
||||
* Sets the partialLockCommand flag to false
|
||||
*/
|
||||
static void
|
||||
FinishLockCommandIfNecessary(StringInfo lockCommand, const char *lockTerminationString,
|
||||
bool *partialLockCommand)
|
||||
{
|
||||
if (*partialLockCommand)
|
||||
{
|
||||
appendStringInfo(lockCommand, "%s", lockTerminationString);
|
||||
}
|
||||
|
||||
*partialLockCommand = false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockRelationRecordListMember checks if a relation id is present in the
|
||||
* LockRelationRecord list
|
||||
*/
|
||||
static bool
|
||||
LockRelationRecordListMember(List *lockRelationRecordList, Oid relationId)
|
||||
{
|
||||
LockRelationRecord *record = NULL;
|
||||
foreach_ptr(record, lockRelationRecordList)
|
||||
{
|
||||
if (record->relationId == relationId)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MakeLockRelationRecord makes a LockRelationRecord using the relation oid
|
||||
* and the inh boolean while properly allocating the structure
|
||||
*/
|
||||
static LockRelationRecord *
|
||||
MakeLockRelationRecord(Oid relationId, bool inh)
|
||||
{
|
||||
LockRelationRecord *lockRelationRecord = palloc(sizeof(LockRelationRecord));
|
||||
lockRelationRecord->relationId = relationId;
|
||||
lockRelationRecord->inh = inh;
|
||||
return lockRelationRecord;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConcatLockRelationRecordList concats a list of LockRelationRecord with
|
||||
* another list of LockRelationRecord created from a list of relation oid-s
|
||||
* which are not present in the first list and an inh bool which will be
|
||||
* applied across all LockRelationRecords
|
||||
*/
|
||||
static List *
|
||||
ConcatLockRelationRecordList(List *lockRelationRecordList, List *relationOidList, bool
|
||||
inh)
|
||||
{
|
||||
List *constructedList = NIL;
|
||||
|
||||
Oid relationId = InvalidOid;
|
||||
foreach_oid(relationId, relationOidList)
|
||||
{
|
||||
if (!LockRelationRecordListMember(lockRelationRecordList, relationId))
|
||||
{
|
||||
LockRelationRecord *record = MakeLockRelationRecord(relationId, inh);
|
||||
constructedList = lappend(constructedList, (void *) record);
|
||||
}
|
||||
}
|
||||
|
||||
return list_concat(lockRelationRecordList, constructedList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AcquireDistributedLockOnRelations_Internal acquire a distributed lock on worker nodes
|
||||
* for given list of relations ids. Worker node list is sorted so that the lock
|
||||
* is acquired in the same order regardless of which node it was run on.
|
||||
*
|
||||
* A nowait flag is used to require the locks to be available immediately
|
||||
* and if that is not the case, an error will be thrown
|
||||
*
|
||||
* Notice that no validation or filtering is done on the relationIds, that is the responsibility
|
||||
* of this function's caller.
|
||||
*/
|
||||
static void
|
||||
AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList,
|
||||
LOCKMODE lockMode, bool nowait)
|
||||
{
|
||||
const char *lockModeText = LockModeToLockModeText(lockMode);
|
||||
|
||||
|
@ -1104,22 +1216,15 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
|||
*/
|
||||
bool startedLockCommand = false;
|
||||
|
||||
/* create a lock termination string used to terminate a partial lock command */
|
||||
const char *lockTerminationString = CreateLockTerminationString(lockModeText, nowait);
|
||||
|
||||
int lockedRelations = 0;
|
||||
Oid relationId = InvalidOid;
|
||||
foreach_oid(relationId, relationIdList)
|
||||
LockRelationRecord *lockRelationRecord;
|
||||
foreach_ptr(lockRelationRecord, lockRelationRecordList)
|
||||
{
|
||||
/*
|
||||
* we want to prevent under privileged users to trigger establishing connections,
|
||||
* that's why we have this additional check. Otherwise, we'd get the errors as
|
||||
* soon as we execute the command on any node
|
||||
*/
|
||||
EnsureCanAcquireLock(relationId, lockMode);
|
||||
|
||||
if (!ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Oid relationId = lockRelationRecord->relationId;
|
||||
bool lockDescendants = lockRelationRecord->inh;
|
||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||
|
||||
/*
|
||||
|
@ -1128,12 +1233,18 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
|||
*/
|
||||
if (get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
/* finish the partial lock statement */
|
||||
if (startedLockCommand)
|
||||
{
|
||||
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText);
|
||||
startedLockCommand = false;
|
||||
}
|
||||
FinishLockCommandIfNecessary(lockRelationsCommand, lockTerminationString,
|
||||
&startedLockCommand);
|
||||
|
||||
/*
|
||||
* The user should not be able to trigger this codepath
|
||||
* with nowait = true or lockDescendants = false since the
|
||||
* only way to do that is calling LOCK * IN * MODE NOWAIT;
|
||||
* and LOCK ONLY * IN * MODE; respectively but foreign tables
|
||||
* cannot be locked with LOCK command as of pg14
|
||||
*/
|
||||
Assert(nowait == false);
|
||||
Assert(lockDescendants == true);
|
||||
|
||||
/* use lock_relation_if_exits to lock foreign table */
|
||||
appendStringInfo(lockRelationsCommand, LOCK_RELATION_IF_EXISTS,
|
||||
|
@ -1144,12 +1255,16 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
|||
else if (startedLockCommand)
|
||||
{
|
||||
/* append relation to partial lock statement */
|
||||
appendStringInfo(lockRelationsCommand, ", %s", qualifiedRelationName);
|
||||
appendStringInfo(lockRelationsCommand, ",%s%s",
|
||||
lockDescendants ? " " : " ONLY ",
|
||||
qualifiedRelationName);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* start a new partial lock statement */
|
||||
appendStringInfo(lockRelationsCommand, "LOCK %s", qualifiedRelationName);
|
||||
appendStringInfo(lockRelationsCommand, "LOCK%s%s",
|
||||
lockDescendants ? " " : " ONLY ",
|
||||
qualifiedRelationName);
|
||||
startedLockCommand = true;
|
||||
}
|
||||
|
||||
|
@ -1161,14 +1276,11 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
|||
return;
|
||||
}
|
||||
|
||||
if (startedLockCommand)
|
||||
{
|
||||
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText);
|
||||
}
|
||||
FinishLockCommandIfNecessary(lockRelationsCommand, lockTerminationString,
|
||||
&startedLockCommand);
|
||||
|
||||
appendStringInfo(lockRelationsCommand, ENABLE_DDL_PROPAGATION);
|
||||
const char *lockCommand = lockRelationsCommand->data;
|
||||
|
||||
char *lockCommand = lockRelationsCommand->data;
|
||||
|
||||
List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES, NoLock);
|
||||
|
||||
|
@ -1181,6 +1293,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
|||
int32 localGroupId = GetLocalGroupId();
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
const char *currentUser = CurrentUserName();
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
const char *nodeName = workerNode->workerName;
|
||||
|
@ -1193,6 +1306,126 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
|
|||
continue;
|
||||
}
|
||||
|
||||
SendCommandToWorker(nodeName, nodePort, lockCommand);
|
||||
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort,
|
||||
currentUser, list_make1(
|
||||
lockCommand));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AcquireDistributedLockOnRelations filters relations before passing them to
|
||||
* AcquireDistributedLockOnRelations_Internal to acquire the locks.
|
||||
*
|
||||
* Only tables, views, and foreign tables can be locked with this function. Other relations
|
||||
* will cause an error.
|
||||
*
|
||||
* Skips non distributed relations.
|
||||
* configs parameter is used to configure what relation will be locked and if the lock
|
||||
* should throw an error if it cannot be acquired immediately
|
||||
*/
|
||||
void
|
||||
AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode, uint32 configs)
|
||||
{
|
||||
if (!ClusterHasKnownMetadataWorkers() || !EnableMetadataSync || !EnableDDLPropagation)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
/* used to store the relations that will be locked */
|
||||
List *distributedRelationRecordsList = NIL;
|
||||
|
||||
bool nowait = (configs & DIST_LOCK_NOWAIT) > 0;
|
||||
|
||||
RangeVar *rangeVar = NULL;
|
||||
foreach_ptr(rangeVar, relationList)
|
||||
{
|
||||
Oid relationId = RangeVarGetRelidExtended(rangeVar, AccessShareLock, nowait ?
|
||||
RVR_NOWAIT : 0, NULL, NULL);
|
||||
|
||||
LockRelationRecord *lockRelationRecord = MakeLockRelationRecord(relationId,
|
||||
rangeVar->inh);
|
||||
|
||||
/*
|
||||
* Note that allowing the user to lock shards could lead to
|
||||
* distributed deadlocks due to shards not being locked when
|
||||
* a distributed table is locked.
|
||||
* However, because citus.enable_manual_changes_to_shards
|
||||
* is a guc which is not visible by default, whoever is using this
|
||||
* guc will hopefully know what they're doing and avoid such scenarios.
|
||||
*/
|
||||
ErrorIfIllegallyChangingKnownShard(relationId);
|
||||
|
||||
/*
|
||||
* we want to prevent under privileged users to trigger establishing connections,
|
||||
* that's why we have this additional check. Otherwise, we'd get the errors as
|
||||
* soon as we execute the command on any node
|
||||
*/
|
||||
EnsureCanAcquireLock(relationId, lockMode);
|
||||
|
||||
bool isView = get_rel_relkind(relationId) == RELKIND_VIEW;
|
||||
if ((isView && !IsViewDistributed(relationId)) ||
|
||||
(!isView && !ShouldSyncTableMetadata(relationId)))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!LockRelationRecordListMember(distributedRelationRecordsList, relationId))
|
||||
{
|
||||
distributedRelationRecordsList = lappend(distributedRelationRecordsList,
|
||||
(void *) lockRelationRecord);
|
||||
}
|
||||
|
||||
if ((configs & DIST_LOCK_REFERENCING_TABLES) > 0)
|
||||
{
|
||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||
Assert(cacheEntry != NULL);
|
||||
|
||||
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
|
||||
|
||||
/* remove the relations which should not be synced */
|
||||
referencingTableList = list_filter_oid(referencingTableList,
|
||||
&ShouldSyncTableMetadata);
|
||||
|
||||
distributedRelationRecordsList = ConcatLockRelationRecordList(
|
||||
distributedRelationRecordsList, referencingTableList, true);
|
||||
}
|
||||
}
|
||||
|
||||
if (distributedRelationRecordsList != NIL)
|
||||
{
|
||||
if (!IsCoordinator() && !CoordinatorAddedAsWorkerNode() &&
|
||||
!EnableAcquiringUnsafeLockFromWorkers)
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errmsg(
|
||||
"Cannot acquire a distributed lock from a worker node since the "
|
||||
"coordinator is not in the metadata."),
|
||||
errhint(
|
||||
"Either run this command on the coordinator or add the coordinator "
|
||||
"in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);\n"
|
||||
"Alternatively, though it is not recommended, you can allow this command by running: "
|
||||
"SET citus.allow_unsafe_locks_from_workers TO 'on';")));
|
||||
}
|
||||
|
||||
AcquireDistributedLockOnRelations_Internal(distributedRelationRecordsList,
|
||||
lockMode, nowait);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* PreprocessLockStatement makes sure that the lock is allowed
|
||||
* before calling AcquireDistributedLockOnRelations on the locked tables/views
|
||||
* with flags DIST_LOCK_VIEWS_RECUR and DIST_LOCK_NOWAIT if nowait is true for
|
||||
* the lock statement
|
||||
*/
|
||||
void
|
||||
PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context)
|
||||
{
|
||||
bool isTopLevel = context == PROCESS_UTILITY_TOPLEVEL;
|
||||
RequireTransactionBlock(isTopLevel, "LOCK TABLE");
|
||||
|
||||
uint32 nowaitFlag = stmt->nowait ? DIST_LOCK_NOWAIT : 0;
|
||||
AcquireDistributedLockOnRelations(stmt->relations, stmt->mode, nowaitFlag);
|
||||
}
|
||||
|
|
|
@ -536,6 +536,7 @@ extern char * CreateViewDDLCommand(Oid viewOid);
|
|||
extern char * AlterViewOwnerCommand(Oid viewOid);
|
||||
extern char * DeparseViewStmt(Node *node);
|
||||
extern char * DeparseDropViewStmt(Node *node);
|
||||
extern bool IsViewDistributed(Oid viewOid);
|
||||
extern List * CreateViewDDLCommandsIdempotent(Oid viewOid);
|
||||
extern List * PreprocessAlterViewStmt(Node *node, const char *queryString,
|
||||
ProcessUtilityContext processUtilityContext);
|
||||
|
|
|
@ -176,5 +176,6 @@ extern List * ListTake(List *pointerList, int size);
|
|||
extern void * safe_list_nth(const List *list, int index);
|
||||
extern List * GeneratePositiveIntSequenceList(int upTo);
|
||||
extern List * GenerateListFromElement(void *listElement, int listLength);
|
||||
extern List * list_filter_oid(List *list, bool (*keepElement)(Oid element));
|
||||
|
||||
#endif /* CITUS_LISTUTILS_H */
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "distributed/worker_transaction.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "storage/lock.h"
|
||||
#include "tcop/utility.h"
|
||||
|
||||
|
||||
/*
|
||||
|
@ -109,6 +110,26 @@ typedef enum CitusOperations
|
|||
(uint32) 0, \
|
||||
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP)
|
||||
|
||||
/*
|
||||
* DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations
|
||||
*/
|
||||
enum DistLockConfigs
|
||||
{
|
||||
/*
|
||||
* lock citus tables
|
||||
*/
|
||||
DIST_LOCK_DEFAULT = 0,
|
||||
|
||||
/*
|
||||
* lock tables that refer to locked citus tables with a foreign key
|
||||
*/
|
||||
DIST_LOCK_REFERENCING_TABLES = 1,
|
||||
|
||||
/*
|
||||
* throw an error if the lock is not immediately available
|
||||
*/
|
||||
DIST_LOCK_NOWAIT = 2
|
||||
};
|
||||
|
||||
/* Lock shard/relation metadata for safe modifications */
|
||||
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
|
||||
|
@ -117,7 +138,6 @@ extern bool TryLockPlacementCleanup(void);
|
|||
extern void EnsureShardOwner(uint64 shardId, bool missingOk);
|
||||
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
|
||||
extern void BlockWritesToShardList(List *shardList);
|
||||
extern void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
|
||||
|
||||
/* Lock shard/relation metadata of the referenced reference table if exists */
|
||||
extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId,
|
||||
|
@ -152,5 +172,10 @@ extern void LockParentShardResourceIfPartition(List *shardIntervalList,
|
|||
/* Lock mode translation between text and enum */
|
||||
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);
|
||||
extern const char * LockModeToLockModeText(LOCKMODE lockMode);
|
||||
extern void AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode,
|
||||
uint32 configs);
|
||||
extern void PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context);
|
||||
|
||||
extern bool EnableAcquiringUnsafeLockFromWorkers;
|
||||
|
||||
#endif /* RESOURCE_LOCK_H */
|
||||
|
|
|
@ -0,0 +1,155 @@
|
|||
CREATE SCHEMA distribute_lock_tests;
|
||||
SET search_path TO distribute_lock_tests;
|
||||
SET citus.next_shard_id TO 10000;
|
||||
CREATE TABLE dist_table(a int);
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
|
||||
-- Test acquiring lock outside transaction
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: LOCK TABLE can only be used in transaction blocks
|
||||
-- Test acquiring lock inside procedure
|
||||
DO $$
|
||||
BEGIN
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
END;
|
||||
$$;
|
||||
-- Try all valid lock options; also try omitting the optional TABLE keyword.
|
||||
BEGIN TRANSACTION;
|
||||
LOCK TABLE dist_table IN ACCESS SHARE MODE;
|
||||
LOCK dist_table IN ROW SHARE MODE;
|
||||
LOCK TABLE dist_table IN ROW EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN SHARE UPDATE EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN SHARE MODE;
|
||||
LOCK dist_table IN SHARE ROW EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
-- Test that when the user does not have the required permissions to lock
|
||||
-- the locks are not forwarded to the workers
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT run_command_on_workers($$
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE read_only_user WITH LOGIN;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,SET)
|
||||
(localhost,57638,t,SET)
|
||||
(2 rows)
|
||||
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE read_only_user WITH LOGIN;
|
||||
GRANT ALL ON SCHEMA distribute_lock_tests TO read_only_user;
|
||||
GRANT SELECT ON dist_table TO read_only_user;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
RESET client_min_messages;
|
||||
SET ROLE read_only_user;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: permission denied for table dist_table
|
||||
ROLLBACK;
|
||||
SET citus.log_remote_commands TO OFF;
|
||||
RESET ROLE;
|
||||
-- test that user with view permissions can lock the tables
|
||||
-- which the view is built on
|
||||
CREATE VIEW myview AS SELECT * FROM dist_table;
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT run_command_on_workers($$
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE user_with_view_permissions WITH LOGIN;
|
||||
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
|
||||
GRANT ALL ON distribute_lock_tests.myview TO user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,SET)
|
||||
(localhost,57638,t,SET)
|
||||
(2 rows)
|
||||
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE user_with_view_permissions WITH LOGIN;
|
||||
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
|
||||
GRANT ALL ON myview TO user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
RESET client_min_messages;
|
||||
SET ROLE TO user_with_view_permissions;
|
||||
BEGIN;
|
||||
LOCK myview IN ACCESS EXCLUSIVE MODE;
|
||||
SELECT run_command_on_workers($$
|
||||
SELECT mode FROM pg_locks WHERE relation = 'distribute_lock_tests.dist_table'::regclass;
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,AccessExclusiveLock)
|
||||
(localhost,57638,t,AccessExclusiveLock)
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
RESET ROLE;
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO distribute_lock_tests;
|
||||
-- Test trying to lock from a worker when the coordinator is not in the metadata
|
||||
SET citus.allow_unsafe_locks_from_workers TO 'off';
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: Cannot acquire a distributed lock from a worker node since the coordinator is not in the metadata.
|
||||
HINT: Either run this command on the coordinator or add the coordinator in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);
|
||||
Alternatively, though it is not recommended, you can allow this command by running: SET citus.allow_unsafe_locks_from_workers TO 'on';
|
||||
ROLLBACK;
|
||||
-- Verify that the same restriction does not apply to worker local tables
|
||||
CREATE TABLE local_table(a int);
|
||||
-- Verify that no locks will be distributed for the local lock
|
||||
SET citus.log_remote_commands TO ON;
|
||||
BEGIN;
|
||||
LOCK local_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
RESET citus.log_remote_commands;
|
||||
-- Cleanup local table
|
||||
DROP TABLE local_table;
|
||||
-- Test that setting the guc to 'on' will allow the lock from workers
|
||||
SET citus.allow_unsafe_locks_from_workers TO 'on';
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
-- Test locking a shard
|
||||
SET citus.enable_manual_changes_to_shards TO OFF;
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ERROR: cannot modify "dist_table_10000" because it is a shard of a distributed table
|
||||
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
|
||||
ROLLBACK;
|
||||
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
|
||||
SET citus.enable_manual_changes_to_shards TO ON;
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
RESET citus.enable_manual_changes_to_shards;
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA distribute_lock_tests CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to table distribute_lock_tests.dist_table
|
||||
drop cascades to view distribute_lock_tests.myview
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
DROP ROLE read_only_user;
|
||||
DROP ROLE user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
SELECT run_command_on_workers($$
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
DROP USER read_only_user;
|
||||
DROP USER user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,SET)
|
||||
(localhost,57638,t,SET)
|
||||
(2 rows)
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -129,8 +129,8 @@ step s2-view-worker:
|
|||
|
||||
query |state |wait_event_type|wait_event|usename |datname
|
||||
---------------------------------------------------------------------
|
||||
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
|
||||
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
|
||||
UPDATE public.ref_table_1500877 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
|
||||
UPDATE public.ref_table_1500877 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
|
||||
(2 rows)
|
||||
|
||||
step s2-end:
|
||||
|
|
|
@ -18,7 +18,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s1-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
@ -42,7 +42,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
@ -138,7 +138,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
@ -210,7 +210,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s1-insert-select:
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM data_table');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
@ -234,7 +234,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
@ -330,7 +330,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
@ -402,7 +402,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s1-copy:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 5, 50 && echo 9, 90 && echo 10, 100''WITH CSV');
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 6, 60 && echo 9, 90 && echo 10, 100''WITH CSV');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
@ -426,7 +426,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
@ -504,7 +504,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
@ -587,7 +587,7 @@ run_commands_on_session_level_connection_to_node
|
|||
(1 row)
|
||||
|
||||
step s2-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
@ -640,3 +640,59 @@ restore_isolation_tester_func
|
|||
|
||||
(1 row)
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-truncate s3-select-count-from-ref-table s1-commit-worker s1-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-truncate:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s3-select-count-from-ref-table:
|
||||
SELECT COUNT(*) FROM referencing_table_2;
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s3-select-count-from-ref-table: <... completed>
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
restore_isolation_tester_func
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
|
|
|
@ -251,19 +251,13 @@ step s2-truncate:
|
|||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
s2: WARNING: relation "public.dist_table" does not exist
|
||||
s2: WARNING: relation "public.dist_table" does not exist
|
||||
step s2-truncate: <... completed>
|
||||
ERROR: failure on connection marked as essential: localhost:xxxxx
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
|
||||
a|b
|
||||
---------------------------------------------------------------------
|
||||
1|2
|
||||
3|4
|
||||
5|6
|
||||
(3 rows)
|
||||
(0 rows)
|
||||
|
||||
restore_isolation_tester_func
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -73,6 +73,7 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name
|
|||
|
||||
-- make sure that pg_class queries do not get blocked on table locks
|
||||
begin;
|
||||
SET LOCAL citus.enable_ddl_propagation TO OFF;
|
||||
lock table test_table in access exclusive mode;
|
||||
prepare transaction 'take-aggressive-lock';
|
||||
-- shards are hidden when using psql as application_name
|
||||
|
|
|
@ -94,5 +94,6 @@ test: isolation_metadata_sync_deadlock
|
|||
test: isolation_replicated_dist_on_mx
|
||||
test: isolation_replicate_reference_tables_to_coordinator
|
||||
test: isolation_multiuser_locking
|
||||
test: isolation_acquire_distributed_locks
|
||||
|
||||
test: isolation_check_mx
|
||||
|
|
|
@ -61,6 +61,7 @@ test: locally_execute_intermediate_results
|
|||
test: multi_mx_alter_distributed_table
|
||||
test: update_colocation_mx
|
||||
test: resync_metadata_with_sequences
|
||||
test: distributed_locks
|
||||
|
||||
# should be executed sequentially because it modifies metadata
|
||||
test: local_shard_execution_dropped_column
|
||||
|
|
|
@ -464,6 +464,7 @@ push(@pgOptions, "citus.shard_replication_factor=2");
|
|||
push(@pgOptions, "citus.node_connection_timeout=${connectionTimeout}");
|
||||
push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
|
||||
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
|
||||
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
|
||||
|
||||
# Some tests look at shards in pg_class, make sure we can usually see them:
|
||||
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
|
||||
|
|
|
@ -0,0 +1,242 @@
|
|||
#include "isolation_mx_common.include.spec"
|
||||
|
||||
setup
|
||||
{
|
||||
SELECT citus_set_coordinator_host('localhost', 57636);
|
||||
|
||||
CREATE TABLE dist_table(a int);
|
||||
CREATE TABLE citus_local_table(a int);
|
||||
CREATE TABLE local_table(a int);
|
||||
CREATE TABLE ref_table(a int);
|
||||
|
||||
CREATE TABLE partitioned_table(a int)
|
||||
PARTITION BY RANGE(a);
|
||||
|
||||
CREATE TABLE partition_1 PARTITION OF partitioned_table
|
||||
FOR VALUES FROM (1) TO (11);
|
||||
|
||||
CREATE TABLE partition_2 PARTITION OF partitioned_table
|
||||
FOR VALUES FROM (11) TO (21);
|
||||
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
SELECT create_reference_table('ref_table');
|
||||
SELECT citus_add_local_table_to_metadata('citus_local_table');
|
||||
SELECT create_distributed_table('partitioned_table', 'a');
|
||||
|
||||
CREATE VIEW sub_view(a) AS
|
||||
SELECT 2 * a AS a
|
||||
FROM ref_table;
|
||||
|
||||
CREATE VIEW main_view AS
|
||||
SELECT t1.a a1, t2.a a2, t3.a a3
|
||||
FROM dist_table t1
|
||||
JOIN citus_local_table t2 ON t1.a = t2.a
|
||||
JOIN sub_view t3 ON t2.a = t3.a;
|
||||
|
||||
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
|
||||
INSERT INTO citus_local_table SELECT n FROM generate_series(1, 5) n;
|
||||
INSERT INTO local_table SELECT n FROM generate_series(1, 5) n;
|
||||
INSERT INTO ref_table SELECT n FROM generate_series(1, 5) n;
|
||||
INSERT INTO partitioned_table SELECT n FROM generate_series(8, 12) n;
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP VIEW main_view;
|
||||
DROP VIEW sub_view;
|
||||
DROP TABLE dist_table;
|
||||
DROP TABLE citus_local_table;
|
||||
DROP TABLE local_table;
|
||||
DROP TABLE ref_table;
|
||||
DROP TABLE partitioned_table;
|
||||
|
||||
SELECT citus_remove_node('localhost', 57636);
|
||||
|
||||
SELECT citus_internal.restore_isolation_tester_func();
|
||||
}
|
||||
|
||||
// coordinator session
|
||||
session "coor"
|
||||
|
||||
step "coor-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-dist-table"
|
||||
{
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-dist-table-nowait"
|
||||
{
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE NOWAIT;
|
||||
}
|
||||
|
||||
step "coor-acquire-weak-lock-on-dist-table"
|
||||
{
|
||||
LOCK dist_table IN ACCESS SHARE MODE;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-view"
|
||||
{
|
||||
LOCK main_view IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-only-view"
|
||||
{
|
||||
LOCK ONLY main_view IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-view-nowait"
|
||||
{
|
||||
LOCK main_view IN ACCESS EXCLUSIVE MODE NOWAIT;
|
||||
}
|
||||
|
||||
step "coor-lock-all"
|
||||
{
|
||||
LOCK dist_table, citus_local_table, ref_table, main_view, sub_view, local_table IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-read-dist-table"
|
||||
{
|
||||
SELECT COUNT(*) FROM dist_table;
|
||||
}
|
||||
|
||||
step "coor-read-ref-table"
|
||||
{
|
||||
SELECT COUNT(*) FROM ref_table;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-partitioned-table"
|
||||
{
|
||||
LOCK partitioned_table IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-partitioned-table-with-*-syntax"
|
||||
{
|
||||
LOCK partitioned_table * IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-only-partitioned-table"
|
||||
{
|
||||
LOCK ONLY partitioned_table IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-acquire-aggresive-lock-on-ref-table"
|
||||
{
|
||||
LOCK ref_table IN ACCESS EXCLUSIVE MODE;
|
||||
}
|
||||
|
||||
step "coor-rollback"
|
||||
{
|
||||
ROLLBACK;
|
||||
}
|
||||
|
||||
// worker 1 xact session
|
||||
session "w1"
|
||||
|
||||
step "w1-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
}
|
||||
|
||||
step "w1-begin"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
}
|
||||
|
||||
step "w1-read-dist-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM dist_table');
|
||||
}
|
||||
|
||||
step "w1-read-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM ref_table');
|
||||
}
|
||||
|
||||
step "w1-read-citus-local-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM citus_local_table');
|
||||
}
|
||||
|
||||
step "w1-acquire-aggressive-lock-dist-table" {
|
||||
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
|
||||
}
|
||||
|
||||
step "w1-lock-reference-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('LOCK ref_table IN ACCESS EXCLUSIVE MODE');
|
||||
}
|
||||
|
||||
step "w1-read-partitioned-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM partitioned_table');
|
||||
}
|
||||
|
||||
step "w1-read-partition-of-partitioned-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM partition_1');
|
||||
}
|
||||
|
||||
step "w1-read-main-view"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM main_view');
|
||||
}
|
||||
|
||||
step "w1-rollback"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
|
||||
}
|
||||
|
||||
step "w1-stop-connection"
|
||||
{
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
}
|
||||
|
||||
// worker 2 xact session
|
||||
session "w2"
|
||||
|
||||
step "w2-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
}
|
||||
|
||||
step "w2-begin"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
}
|
||||
|
||||
step "w2-acquire-aggressive-lock-dist-table" {
|
||||
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
|
||||
}
|
||||
|
||||
step "w2-rollback"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
|
||||
}
|
||||
|
||||
step "w2-stop-connection"
|
||||
{
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
}
|
||||
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "coor-read-dist-table" "w1-rollback" "w1-stop-connection"
|
||||
permutation "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-begin" "coor-acquire-aggresive-lock-on-dist-table-nowait" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "w1-start-session-level-connection" "w1-begin" "w2-start-session-level-connection" "w2-begin" "w1-acquire-aggressive-lock-dist-table" "w2-acquire-aggressive-lock-dist-table" "w1-rollback" "w1-read-dist-table" "w2-rollback" "w1-stop-connection" "w2-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-weak-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "w1-start-session-level-connection" "w1-begin" "w1-lock-reference-table" "coor-begin" "coor-read-ref-table" "w1-rollback" "coor-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-read-ref-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-view" "w1-start-session-level-connection" "w1-begin" "w1-read-ref-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-begin" "coor-acquire-aggresive-lock-on-view-nowait" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-lock-all" "w1-start-session-level-connection" "w1-begin" "w1-read-citus-local-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table-with-*-syntax" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
||||
permutation "coor-begin" "coor-acquire-aggresive-lock-on-ref-table" "w1-start-session-level-connection" "w1-begin" "w1-read-main-view" "coor-rollback" "w1-rollback" "w1-stop-connection"
|
|
@ -2,17 +2,29 @@
|
|||
|
||||
setup
|
||||
{
|
||||
CREATE TABLE truncate_table(id integer, value integer);
|
||||
CREATE TABLE truncate_table(id integer, value integer, PRIMARY KEY(id));
|
||||
CREATE TABLE data_table(id integer, value integer);
|
||||
CREATE TABLE referencing_table_1 (id integer, PRIMARY KEY(id), FOREIGN KEY (id) REFERENCES truncate_table(id));
|
||||
CREATE TABLE referencing_table_2 (id integer, PRIMARY KEY(id), FOREIGN KEY (id) REFERENCES referencing_table_1(id));
|
||||
|
||||
SELECT create_distributed_table('truncate_table', 'id');
|
||||
SELECT create_distributed_table('data_table', 'id');
|
||||
SELECT create_distributed_table('referencing_table_1', 'id');
|
||||
SELECT create_distributed_table('referencing_table_2', 'id');
|
||||
|
||||
COPY truncate_table FROM PROGRAM 'echo 1, 10 && echo 2, 20 && echo 3, 30 && echo 4, 40 && echo 5, 50' WITH CSV;
|
||||
COPY data_table FROM PROGRAM 'echo 20, 20 && echo 30, 30 && echo 40, 40 && echo 50, 50' WITH CSV;
|
||||
}
|
||||
|
||||
// Create and use UDF to close the connection opened in the setup step. Also return the cluster
|
||||
// back to the initial state.
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS truncate_table CASCADE;
|
||||
SELECT citus_internal.restore_isolation_tester_func();
|
||||
DROP TABLE IF EXISTS data_table;
|
||||
DROP TABLE IF EXISTS referencing_table_2;
|
||||
DROP TABLE IF EXISTS referencing_table_1;
|
||||
DROP TABLE IF EXISTS truncate_table CASCADE;
|
||||
SELECT citus_internal.restore_isolation_tester_func();
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
@ -36,7 +48,7 @@ step "s1-begin-on-worker"
|
|||
|
||||
step "s1-truncate"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
}
|
||||
|
||||
step "s1-select"
|
||||
|
@ -46,7 +58,7 @@ step "s1-select"
|
|||
|
||||
step "s1-insert-select"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM data_table');
|
||||
}
|
||||
|
||||
step "s1-delete"
|
||||
|
@ -56,7 +68,7 @@ step "s1-delete"
|
|||
|
||||
step "s1-copy"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 5, 50 && echo 9, 90 && echo 10, 100''WITH CSV');
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 6, 60 && echo 9, 90 && echo 10, 100''WITH CSV');
|
||||
}
|
||||
|
||||
step "s1-alter"
|
||||
|
@ -101,7 +113,7 @@ step "s2-begin-on-worker"
|
|||
|
||||
step "s2-truncate"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
|
||||
}
|
||||
|
||||
step "s2-commit-worker"
|
||||
|
@ -122,6 +134,11 @@ step "s3-select-count"
|
|||
SELECT COUNT(*) FROM truncate_table;
|
||||
}
|
||||
|
||||
step "s3-select-count-from-ref-table"
|
||||
{
|
||||
SELECT COUNT(*) FROM referencing_table_2;
|
||||
}
|
||||
|
||||
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-truncate" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
|
||||
|
||||
|
@ -131,3 +148,4 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete"
|
|||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
|
||||
permutation "s1-begin" "s1-alter" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit" "s2-commit-worker" "s2-stop-connection" "s3-select-count"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-truncate" "s3-select-count-from-ref-table" "s1-commit-worker" "s1-stop-connection"
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
CREATE SCHEMA distribute_lock_tests;
|
||||
SET search_path TO distribute_lock_tests;
|
||||
|
||||
SET citus.next_shard_id TO 10000;
|
||||
|
||||
CREATE TABLE dist_table(a int);
|
||||
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
|
||||
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
|
||||
|
||||
-- Test acquiring lock outside transaction
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
|
||||
-- Test acquiring lock inside procedure
|
||||
DO $$
|
||||
BEGIN
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Try all valid lock options; also try omitting the optional TABLE keyword.
|
||||
BEGIN TRANSACTION;
|
||||
LOCK TABLE dist_table IN ACCESS SHARE MODE;
|
||||
LOCK dist_table IN ROW SHARE MODE;
|
||||
LOCK TABLE dist_table IN ROW EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN SHARE UPDATE EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN SHARE MODE;
|
||||
LOCK dist_table IN SHARE ROW EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN EXCLUSIVE MODE;
|
||||
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test that when the user does not have the required permissions to lock
|
||||
-- the locks are not forwarded to the workers
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT run_command_on_workers($$
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE read_only_user WITH LOGIN;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$$);
|
||||
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE read_only_user WITH LOGIN;
|
||||
GRANT ALL ON SCHEMA distribute_lock_tests TO read_only_user;
|
||||
GRANT SELECT ON dist_table TO read_only_user;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
RESET client_min_messages;
|
||||
|
||||
SET ROLE read_only_user;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
SET citus.log_remote_commands TO OFF;
|
||||
RESET ROLE;
|
||||
|
||||
-- test that user with view permissions can lock the tables
|
||||
-- which the view is built on
|
||||
CREATE VIEW myview AS SELECT * FROM dist_table;
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT run_command_on_workers($$
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE user_with_view_permissions WITH LOGIN;
|
||||
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
|
||||
GRANT ALL ON distribute_lock_tests.myview TO user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$$);
|
||||
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE ROLE user_with_view_permissions WITH LOGIN;
|
||||
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
|
||||
GRANT ALL ON myview TO user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
RESET client_min_messages;
|
||||
|
||||
SET ROLE TO user_with_view_permissions;
|
||||
|
||||
BEGIN;
|
||||
LOCK myview IN ACCESS EXCLUSIVE MODE;
|
||||
SELECT run_command_on_workers($$
|
||||
SELECT mode FROM pg_locks WHERE relation = 'distribute_lock_tests.dist_table'::regclass;
|
||||
$$);
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
RESET ROLE;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO distribute_lock_tests;
|
||||
|
||||
-- Test trying to lock from a worker when the coordinator is not in the metadata
|
||||
SET citus.allow_unsafe_locks_from_workers TO 'off';
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
-- Verify that the same restriction does not apply to worker local tables
|
||||
CREATE TABLE local_table(a int);
|
||||
|
||||
-- Verify that no locks will be distributed for the local lock
|
||||
SET citus.log_remote_commands TO ON;
|
||||
|
||||
BEGIN;
|
||||
LOCK local_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.log_remote_commands;
|
||||
|
||||
-- Cleanup local table
|
||||
DROP TABLE local_table;
|
||||
|
||||
-- Test that setting the guc to 'on' will allow the lock from workers
|
||||
SET citus.allow_unsafe_locks_from_workers TO 'on';
|
||||
BEGIN;
|
||||
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test locking a shard
|
||||
SET citus.enable_manual_changes_to_shards TO OFF;
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
|
||||
SET citus.enable_manual_changes_to_shards TO ON;
|
||||
|
||||
BEGIN;
|
||||
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.enable_manual_changes_to_shards;
|
||||
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA distribute_lock_tests CASCADE;
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
DROP ROLE read_only_user;
|
||||
DROP ROLE user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
SELECT run_command_on_workers($$
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
DROP USER read_only_user;
|
||||
DROP USER user_with_view_permissions;
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$$);
|
|
@ -43,7 +43,9 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name
|
|||
|
||||
-- make sure that pg_class queries do not get blocked on table locks
|
||||
begin;
|
||||
SET LOCAL citus.enable_ddl_propagation TO OFF;
|
||||
lock table test_table in access exclusive mode;
|
||||
|
||||
prepare transaction 'take-aggressive-lock';
|
||||
|
||||
-- shards are hidden when using psql as application_name
|
||||
|
|
Loading…
Reference in New Issue