Add distributing lock command support

(cherry picked from commit 4731630741)
release-11.0-onder-23-may
gledis69 2022-05-20 10:39:22 +03:00
parent 3f282c660b
commit 909b72b027
24 changed files with 2116 additions and 143 deletions

View File

@ -44,7 +44,6 @@
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command);
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
static List * TruncateTaskList(Oid relationId);
@ -244,7 +243,13 @@ PreprocessTruncateStatement(TruncateStmt *truncateStatement)
ErrorIfUnsupportedTruncateStmt(truncateStatement);
EnsurePartitionTableNotReplicatedForTruncate(truncateStatement);
ExecuteTruncateStmtSequentialIfNecessary(truncateStatement);
LockTruncatedRelationMetadataInWorkers(truncateStatement);
uint32 lockAcquiringMode = truncateStatement->behavior == DROP_CASCADE ?
DIST_LOCK_REFERENCING_TABLES :
DIST_LOCK_DEFAULT;
AcquireDistributedLockOnRelations(truncateStatement->relations, AccessExclusiveLock,
lockAcquiringMode);
}
@ -341,69 +346,3 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
}
}
}
/*
* LockTruncatedRelationMetadataInWorkers determines if distributed
* lock is necessary for truncated relations, and acquire locks.
*
* LockTruncatedRelationMetadataInWorkers handles distributed locking
* of truncated tables before standard utility takes over.
*
* Actual distributed truncation occurs inside truncate trigger.
*
* This is only for distributed serialization of truncate commands.
* The function assumes that there is no foreign key relation between
* non-distributed and distributed relations.
*/
static void
LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
{
/* nothing to do if there is no metadata at worker nodes */
if (!ClusterHasKnownMetadataWorkers() || !EnableMetadataSync)
{
return;
}
List *distributedRelationList = NIL;
/*
* this is used to enforce the lock order:
* [...TruncatedTables], [...TablesTruncatedFromCascadingOnTruncatedTables]
*/
List *referencingRelationIds = NIL;
RangeVar *rangeVar = NULL;
foreach_ptr(rangeVar, truncateStatement->relations)
{
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
if (!IsCitusTable(relationId))
{
continue;
}
distributedRelationList = list_append_unique_oid(distributedRelationList,
relationId);
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
Assert(cacheEntry != NULL);
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
Oid referencingRelationId = InvalidOid;
foreach_oid(referencingRelationId, referencingTableList)
{
referencingRelationIds = lappend_oid(referencingRelationIds,
referencingRelationId);
}
}
distributedRelationList = list_concat_unique_oid(distributedRelationList,
referencingRelationIds);
if (distributedRelationList != NIL)
{
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock);
}
}

View File

@ -164,7 +164,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
parsetree = pstmt->utilityStmt;
if (IsA(parsetree, TransactionStmt) ||
IsA(parsetree, LockStmt) ||
IsA(parsetree, ListenStmt) ||
IsA(parsetree, NotifyStmt) ||
IsA(parsetree, ExecuteStmt) ||
@ -505,6 +504,18 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
PreprocessTruncateStatement((TruncateStmt *) parsetree);
}
if (IsA(parsetree, LockStmt))
{
/*
* PreprocessLockStatement might lock the relations locally if the
* node executing the command is in pg_dist_node. Even though the process
* utility will re-acquire the locks across the same relations if the node
* is in the metadata (in the pg_dist_node table) that should not be a problem,
* plus it ensures consistent locking order between the nodes.
*/
PreprocessLockStatement((LockStmt *) parsetree, context);
}
/*
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
* and distribute the partition if necessary.

View File

@ -251,10 +251,7 @@ FilterNameListForDistributedViews(List *viewNamesList, bool missing_ok)
continue;
}
ObjectAddress viewAddress = { 0 };
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
if (IsObjectDistributed(&viewAddress))
if (IsViewDistributed(viewOid))
{
distributedViewNames = lappend(distributedViewNames,
possiblyQualifiedViewName);
@ -427,6 +424,21 @@ AlterViewOwnerCommand(Oid viewOid)
}
/*
* IsViewDistributed checks if a view is distributed
*/
bool
IsViewDistributed(Oid viewOid)
{
Assert(get_rel_relkind(viewOid) == RELKIND_VIEW);
ObjectAddress viewAddress = { 0 };
ObjectAddressSet(viewAddress, RelationRelationId, viewOid);
return IsObjectDistributed(&viewAddress);
}
/*
* PreprocessAlterViewStmt is invoked for alter view statements.
*/

View File

@ -74,7 +74,7 @@
#include "distributed/shared_library_init.h"
#include "distributed/statistics_collection.h"
#include "distributed/subplan_execution.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
#include "distributed/worker_log_messages.h"
@ -601,6 +601,26 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.allow_unsafe_locks_from_workers",
gettext_noop("Enables acquiring a distributed lock from a worker "
"when the coordinator is not in the metadata"),
gettext_noop("Set to false by default. If set to true, enables "
"acquiring a distributed lock from a worker "
"when the coordinator is not in the metadata. "
"This type of lock is unsafe because the worker will not be "
"able to lock the coordinator; the coordinator will be able to "
"intialize distributed operations on the resources locked "
"by the worker. This can lead to concurrent operations from the "
"coordinator and distributed deadlocks since the coordinator "
"and the workers would not acquire locks across the same nodes "
"in the same order."),
&EnableAcquiringUnsafeLockFromWorkers,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.binary_worker_copy_format",
gettext_noop("Use the binary worker copy format."),

View File

@ -264,3 +264,24 @@ GenerateListFromElement(void *listElement, int listLength)
return list;
}
/*
* list_filter_oid filters a list of oid-s based on a keepElement
* function
*/
List *
list_filter_oid(List *list, bool (*keepElement)(Oid element))
{
List *result = NIL;
Oid element = InvalidOid;
foreach_oid(element, list)
{
if (keepElement(element))
{
result = lappend_oid(result, element);
}
}
return result;
}

View File

@ -21,6 +21,7 @@
#include "catalog/namespace.h"
#include "commands/tablecmds.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h"
@ -38,12 +39,14 @@
#include "distributed/worker_protocol.h"
#include "distributed/version_compat.h"
#include "distributed/local_executor.h"
#include "distributed/worker_shard_visibility.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/varlena.h"
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists(%s, %s);"
#define LOCK_RELATION_IF_EXISTS \
"SELECT pg_catalog.lock_relation_if_exists(%s, %s);"
/* static definition and declarations */
struct LockModeToStringType
@ -70,6 +73,17 @@ static const struct LockModeToStringType lockmode_to_string_map[] = {
static const int lock_mode_to_string_map_count = sizeof(lockmode_to_string_map) /
sizeof(lockmode_to_string_map[0]);
/*
* LockRelationRecord holds the oid of a relation to be locked
* and a boolean inh to determine whether its decendants
* should be locked as well
*/
typedef struct LockRelationRecord
{
Oid relationId;
bool inh;
} LockRelationRecord;
/* local function forward declarations */
static LOCKMODE IntToLockMode(int mode);
@ -91,6 +105,8 @@ PG_FUNCTION_INFO_V1(lock_shard_metadata);
PG_FUNCTION_INFO_V1(lock_shard_resources);
PG_FUNCTION_INFO_V1(lock_relation_if_exists);
/* Config variable managed via guc.c */
bool EnableAcquiringUnsafeLockFromWorkers = false;
/*
* lock_shard_metadata allows the shard distribution metadata to be locked
@ -1080,14 +1096,110 @@ EnsureCanAcquireLock(Oid relationId, LOCKMODE lockMode)
/*
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
* for given list of relations ids. Worker node list is sorted so that the lock
* is acquired in the same order regardless of which node it was run on. Notice
* that no lock is acquired on coordinator node if the coordinator is not added
* to the metadata.
* CreateLockTerminationString creates a string that can be appended to the
* end of a partial lock command to properly terminate the command
*/
void
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
static const char *
CreateLockTerminationString(const char *lockModeText, bool nowait)
{
StringInfo lockTerminationStringInfo = makeStringInfo();
appendStringInfo(lockTerminationStringInfo, nowait ? " IN %s MODE NOWAIT;\n" :
" IN %s MODE;\n", lockModeText);
return lockTerminationStringInfo->data;
}
/*
* FinishLockCommandIfNecessary appends the lock termination string if the lock command is partial.
* Sets the partialLockCommand flag to false
*/
static void
FinishLockCommandIfNecessary(StringInfo lockCommand, const char *lockTerminationString,
bool *partialLockCommand)
{
if (*partialLockCommand)
{
appendStringInfo(lockCommand, "%s", lockTerminationString);
}
*partialLockCommand = false;
}
/*
* LockRelationRecordListMember checks if a relation id is present in the
* LockRelationRecord list
*/
static bool
LockRelationRecordListMember(List *lockRelationRecordList, Oid relationId)
{
LockRelationRecord *record = NULL;
foreach_ptr(record, lockRelationRecordList)
{
if (record->relationId == relationId)
{
return true;
}
}
return false;
}
/*
* MakeLockRelationRecord makes a LockRelationRecord using the relation oid
* and the inh boolean while properly allocating the structure
*/
static LockRelationRecord *
MakeLockRelationRecord(Oid relationId, bool inh)
{
LockRelationRecord *lockRelationRecord = palloc(sizeof(LockRelationRecord));
lockRelationRecord->relationId = relationId;
lockRelationRecord->inh = inh;
return lockRelationRecord;
}
/*
* ConcatLockRelationRecordList concats a list of LockRelationRecord with
* another list of LockRelationRecord created from a list of relation oid-s
* which are not present in the first list and an inh bool which will be
* applied across all LockRelationRecords
*/
static List *
ConcatLockRelationRecordList(List *lockRelationRecordList, List *relationOidList, bool
inh)
{
List *constructedList = NIL;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationOidList)
{
if (!LockRelationRecordListMember(lockRelationRecordList, relationId))
{
LockRelationRecord *record = MakeLockRelationRecord(relationId, inh);
constructedList = lappend(constructedList, (void *) record);
}
}
return list_concat(lockRelationRecordList, constructedList);
}
/*
* AcquireDistributedLockOnRelations_Internal acquire a distributed lock on worker nodes
* for given list of relations ids. Worker node list is sorted so that the lock
* is acquired in the same order regardless of which node it was run on.
*
* A nowait flag is used to require the locks to be available immediately
* and if that is not the case, an error will be thrown
*
* Notice that no validation or filtering is done on the relationIds, that is the responsibility
* of this function's caller.
*/
static void
AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList,
LOCKMODE lockMode, bool nowait)
{
const char *lockModeText = LockModeToLockModeText(lockMode);
@ -1103,22 +1215,15 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
*/
bool startedLockCommand = false;
/* create a lock termination string used to terminate a partial lock command */
const char *lockTerminationString = CreateLockTerminationString(lockModeText, nowait);
int lockedRelations = 0;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
LockRelationRecord *lockRelationRecord;
foreach_ptr(lockRelationRecord, lockRelationRecordList)
{
/*
* we want to prevent under privileged users to trigger establishing connections,
* that's why we have this additional check. Otherwise, we'd get the errors as
* soon as we execute the command on any node
*/
EnsureCanAcquireLock(relationId, lockMode);
if (!ShouldSyncTableMetadata(relationId))
{
continue;
}
Oid relationId = lockRelationRecord->relationId;
bool lockDescendants = lockRelationRecord->inh;
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
/*
@ -1127,12 +1232,18 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
*/
if (get_rel_relkind(relationId) == RELKIND_FOREIGN_TABLE)
{
/* finish the partial lock statement */
if (startedLockCommand)
{
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText);
startedLockCommand = false;
}
FinishLockCommandIfNecessary(lockRelationsCommand, lockTerminationString,
&startedLockCommand);
/*
* The user should not be able to trigger this codepath
* with nowait = true or lockDescendants = false since the
* only way to do that is calling LOCK * IN * MODE NOWAIT;
* and LOCK ONLY * IN * MODE; respectively but foreign tables
* cannot be locked with LOCK command as of pg14
*/
Assert(nowait == false);
Assert(lockDescendants == true);
/* use lock_relation_if_exits to lock foreign table */
appendStringInfo(lockRelationsCommand, LOCK_RELATION_IF_EXISTS,
@ -1143,12 +1254,16 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
else if (startedLockCommand)
{
/* append relation to partial lock statement */
appendStringInfo(lockRelationsCommand, ", %s", qualifiedRelationName);
appendStringInfo(lockRelationsCommand, ",%s%s",
lockDescendants ? " " : " ONLY ",
qualifiedRelationName);
}
else
{
/* start a new partial lock statement */
appendStringInfo(lockRelationsCommand, "LOCK %s", qualifiedRelationName);
appendStringInfo(lockRelationsCommand, "LOCK%s%s",
lockDescendants ? " " : " ONLY ",
qualifiedRelationName);
startedLockCommand = true;
}
@ -1160,14 +1275,11 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
return;
}
if (startedLockCommand)
{
appendStringInfo(lockRelationsCommand, " IN %s MODE;\n", lockModeText);
}
FinishLockCommandIfNecessary(lockRelationsCommand, lockTerminationString,
&startedLockCommand);
appendStringInfo(lockRelationsCommand, ENABLE_DDL_PROPAGATION);
const char *lockCommand = lockRelationsCommand->data;
char *lockCommand = lockRelationsCommand->data;
List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES, NoLock);
@ -1180,6 +1292,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
int32 localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL;
const char *currentUser = CurrentUserName();
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
@ -1192,6 +1305,126 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
continue;
}
SendCommandToWorker(nodeName, nodePort, lockCommand);
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort,
currentUser, list_make1(
lockCommand));
}
}
/*
* AcquireDistributedLockOnRelations filters relations before passing them to
* AcquireDistributedLockOnRelations_Internal to acquire the locks.
*
* Only tables, views, and foreign tables can be locked with this function. Other relations
* will cause an error.
*
* Skips non distributed relations.
* configs parameter is used to configure what relation will be locked and if the lock
* should throw an error if it cannot be acquired immediately
*/
void
AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode, uint32 configs)
{
if (!ClusterHasKnownMetadataWorkers() || !EnableMetadataSync || !EnableDDLPropagation)
{
return;
}
/* used to store the relations that will be locked */
List *distributedRelationRecordsList = NIL;
bool nowait = (configs & DIST_LOCK_NOWAIT) > 0;
RangeVar *rangeVar = NULL;
foreach_ptr(rangeVar, relationList)
{
Oid relationId = RangeVarGetRelidExtended(rangeVar, AccessShareLock, nowait ?
RVR_NOWAIT : 0, NULL, NULL);
LockRelationRecord *lockRelationRecord = MakeLockRelationRecord(relationId,
rangeVar->inh);
/*
* Note that allowing the user to lock shards could lead to
* distributed deadlocks due to shards not being locked when
* a distributed table is locked.
* However, because citus.enable_manual_changes_to_shards
* is a guc which is not visible by default, whoever is using this
* guc will hopefully know what they're doing and avoid such scenarios.
*/
ErrorIfIllegallyChangingKnownShard(relationId);
/*
* we want to prevent under privileged users to trigger establishing connections,
* that's why we have this additional check. Otherwise, we'd get the errors as
* soon as we execute the command on any node
*/
EnsureCanAcquireLock(relationId, lockMode);
bool isView = get_rel_relkind(relationId) == RELKIND_VIEW;
if ((isView && !IsViewDistributed(relationId)) ||
(!isView && !ShouldSyncTableMetadata(relationId)))
{
continue;
}
if (!LockRelationRecordListMember(distributedRelationRecordsList, relationId))
{
distributedRelationRecordsList = lappend(distributedRelationRecordsList,
(void *) lockRelationRecord);
}
if ((configs & DIST_LOCK_REFERENCING_TABLES) > 0)
{
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
Assert(cacheEntry != NULL);
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
/* remove the relations which should not be synced */
referencingTableList = list_filter_oid(referencingTableList,
&ShouldSyncTableMetadata);
distributedRelationRecordsList = ConcatLockRelationRecordList(
distributedRelationRecordsList, referencingTableList, true);
}
}
if (distributedRelationRecordsList != NIL)
{
if (!IsCoordinator() && !CoordinatorAddedAsWorkerNode() &&
!EnableAcquiringUnsafeLockFromWorkers)
{
ereport(ERROR,
(errmsg(
"Cannot acquire a distributed lock from a worker node since the "
"coordinator is not in the metadata."),
errhint(
"Either run this command on the coordinator or add the coordinator "
"in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);\n"
"Alternatively, though it is not recommended, you can allow this command by running: "
"SET citus.allow_unsafe_locks_from_workers TO 'on';")));
}
AcquireDistributedLockOnRelations_Internal(distributedRelationRecordsList,
lockMode, nowait);
}
}
/**
* PreprocessLockStatement makes sure that the lock is allowed
* before calling AcquireDistributedLockOnRelations on the locked tables/views
* with flags DIST_LOCK_VIEWS_RECUR and DIST_LOCK_NOWAIT if nowait is true for
* the lock statement
*/
void
PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context)
{
bool isTopLevel = context == PROCESS_UTILITY_TOPLEVEL;
RequireTransactionBlock(isTopLevel, "LOCK TABLE");
uint32 nowaitFlag = stmt->nowait ? DIST_LOCK_NOWAIT : 0;
AcquireDistributedLockOnRelations(stmt->relations, stmt->mode, nowaitFlag);
}

View File

@ -536,6 +536,7 @@ extern char * CreateViewDDLCommand(Oid viewOid);
extern char * AlterViewOwnerCommand(Oid viewOid);
extern char * DeparseViewStmt(Node *node);
extern char * DeparseDropViewStmt(Node *node);
extern bool IsViewDistributed(Oid viewOid);
extern List * CreateViewDDLCommandsIdempotent(Oid viewOid);
extern List * PreprocessAlterViewStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);

View File

@ -178,5 +178,6 @@ extern List * ListTake(List *pointerList, int size);
extern void * safe_list_nth(const List *list, int index);
extern List * GeneratePositiveIntSequenceList(int upTo);
extern List * GenerateListFromElement(void *listElement, int listLength);
extern List * list_filter_oid(List *list, bool (*keepElement)(Oid element));
#endif /* CITUS_LISTUTILS_H */

View File

@ -16,6 +16,7 @@
#include "distributed/worker_transaction.h"
#include "nodes/pg_list.h"
#include "storage/lock.h"
#include "tcop/utility.h"
/*
@ -109,6 +110,26 @@ typedef enum CitusOperations
(uint32) 0, \
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP)
/*
* DistLockConfigs are used to configure the locking behaviour of AcquireDistributedLockOnRelations
*/
enum DistLockConfigs
{
/*
* lock citus tables
*/
DIST_LOCK_DEFAULT = 0,
/*
* lock tables that refer to locked citus tables with a foreign key
*/
DIST_LOCK_REFERENCING_TABLES = 1,
/*
* throw an error if the lock is not immediately available
*/
DIST_LOCK_NOWAIT = 2
};
/* Lock shard/relation metadata for safe modifications */
extern void LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode);
@ -117,7 +138,6 @@ extern bool TryLockPlacementCleanup(void);
extern void EnsureShardOwner(uint64 shardId, bool missingOk);
extern void LockShardListMetadataOnWorkers(LOCKMODE lockmode, List *shardIntervalList);
extern void BlockWritesToShardList(List *shardList);
extern void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
/* Lock shard/relation metadata of the referenced reference table if exists */
extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId,
@ -152,5 +172,10 @@ extern void LockParentShardResourceIfPartition(List *shardIntervalList,
/* Lock mode translation between text and enum */
extern LOCKMODE LockModeTextToLockMode(const char *lockModeName);
extern const char * LockModeToLockModeText(LOCKMODE lockMode);
extern void AcquireDistributedLockOnRelations(List *relationList, LOCKMODE lockMode,
uint32 configs);
extern void PreprocessLockStatement(LockStmt *stmt, ProcessUtilityContext context);
extern bool EnableAcquiringUnsafeLockFromWorkers;
#endif /* RESOURCE_LOCK_H */

View File

@ -214,10 +214,10 @@ def save_regression_diff(name, output_dir):
shutil.move(path, new_file_path)
def sync_metadata_to_workers(pg_path, worker_ports, coordinator_port):
def stop_metadata_to_workers(pg_path, worker_ports, coordinator_port):
for port in worker_ports:
command = (
"SELECT * from start_metadata_sync_to_node('localhost', {port});".format(
"SELECT * from stop_metadata_sync_to_node('localhost', {port});".format(
port=port
)
)
@ -286,8 +286,8 @@ def initialize_citus_cluster(bindir, datadir, settings, config):
start_databases(bindir, datadir, config.node_name_to_ports, config.name, config.env_variables)
create_citus_extension(bindir, config.node_name_to_ports.values())
add_workers(bindir, config.worker_ports, config.coordinator_port())
if config.is_mx:
sync_metadata_to_workers(bindir, config.worker_ports, config.coordinator_port())
if not config.is_mx:
stop_metadata_to_workers(bindir, config.worker_ports, config.coordinator_port())
if config.add_coordinator_to_metadata:
add_coordinator_to_metadata(bindir, config.coordinator_port())
config.setup_steps()

View File

@ -175,7 +175,6 @@ class CitusUpgradeConfig(CitusBaseClusterConfig):
self.user = SUPER_USER_NAME
self.mixed_mode = arguments["--mixed"]
self.fixed_port = 57635
self.is_mx = False
class PostgresConfig(CitusDefaultClusterConfig):
@ -324,6 +323,9 @@ class CitusNonMxClusterConfig(CitusDefaultClusterConfig):
def __init__(self, arguments):
super().__init__(arguments)
self.is_mx = False
# citus does not support distributed functions
# when metadata is not synced
self.skip_tests = ["function_create", "functions"]
class PGUpgradeConfig(CitusBaseClusterConfig):

View File

@ -0,0 +1,155 @@
CREATE SCHEMA distribute_lock_tests;
SET search_path TO distribute_lock_tests;
SET citus.next_shard_id TO 10000;
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
-- Test acquiring lock outside transaction
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ERROR: LOCK TABLE can only be used in transaction blocks
-- Test acquiring lock inside procedure
DO $$
BEGIN
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
END;
$$;
-- Try all valid lock options; also try omitting the optional TABLE keyword.
BEGIN TRANSACTION;
LOCK TABLE dist_table IN ACCESS SHARE MODE;
LOCK dist_table IN ROW SHARE MODE;
LOCK TABLE dist_table IN ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE UPDATE EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE MODE;
LOCK dist_table IN SHARE ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN EXCLUSIVE MODE;
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test that when the user does not have the required permissions to lock
-- the locks are not forwarded to the workers
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
RESET citus.enable_ddl_propagation;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,SET)
(localhost,57638,t,SET)
(2 rows)
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO read_only_user;
GRANT SELECT ON dist_table TO read_only_user;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE read_only_user;
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ERROR: permission denied for table dist_table
ROLLBACK;
SET citus.log_remote_commands TO OFF;
RESET ROLE;
-- test that user with view permissions can lock the tables
-- which the view is built on
CREATE VIEW myview AS SELECT * FROM dist_table;
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON distribute_lock_tests.myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,SET)
(localhost,57638,t,SET)
(2 rows)
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE TO user_with_view_permissions;
BEGIN;
LOCK myview IN ACCESS EXCLUSIVE MODE;
SELECT run_command_on_workers($$
SELECT mode FROM pg_locks WHERE relation = 'distribute_lock_tests.dist_table'::regclass;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,AccessExclusiveLock)
(localhost,57638,t,AccessExclusiveLock)
(2 rows)
ROLLBACK;
RESET ROLE;
\c - - - :worker_1_port
SET search_path TO distribute_lock_tests;
-- Test trying to lock from a worker when the coordinator is not in the metadata
SET citus.allow_unsafe_locks_from_workers TO 'off';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ERROR: Cannot acquire a distributed lock from a worker node since the coordinator is not in the metadata.
HINT: Either run this command on the coordinator or add the coordinator in the metadata by using: SELECT citus_set_coordinator_host('<hostname>', <port>);
Alternatively, though it is not recommended, you can allow this command by running: SET citus.allow_unsafe_locks_from_workers TO 'on';
ROLLBACK;
-- Verify that the same restriction does not apply to worker local tables
CREATE TABLE local_table(a int);
-- Verify that no locks will be distributed for the local lock
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK local_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.log_remote_commands;
-- Cleanup local table
DROP TABLE local_table;
-- Test that setting the guc to 'on' will allow the lock from workers
SET citus.allow_unsafe_locks_from_workers TO 'on';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test locking a shard
SET citus.enable_manual_changes_to_shards TO OFF;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ERROR: cannot modify "dist_table_10000" because it is a shard of a distributed table
HINT: Use the distributed table or set citus.enable_manual_changes_to_shards to on to modify shards directly
ROLLBACK;
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
SET citus.enable_manual_changes_to_shards TO ON;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.enable_manual_changes_to_shards;
\c - - - :master_port
DROP SCHEMA distribute_lock_tests CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table distribute_lock_tests.dist_table
drop cascades to view distribute_lock_tests.myview
SET citus.enable_ddl_propagation TO OFF;
DROP ROLE read_only_user;
DROP ROLE user_with_view_permissions;
RESET citus.enable_ddl_propagation;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
DROP USER read_only_user;
DROP USER user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,SET)
(localhost,57638,t,SET)
(2 rows)

File diff suppressed because it is too large Load Diff

View File

@ -129,8 +129,8 @@ step s2-view-worker:
query |state |wait_event_type|wait_event|usename |datname
---------------------------------------------------------------------
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500777 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500877 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
UPDATE public.ref_table_1500877 ref_table SET a = (a OPERATOR(pg_catalog.+) 1)|idle in transaction|Client |ClientRead|postgres|regression
(2 rows)
step s2-end:

View File

@ -18,7 +18,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s1-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
@ -42,7 +42,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -138,7 +138,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -210,7 +210,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s1-insert-select:
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM truncate_table');
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM data_table');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
@ -234,7 +234,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -330,7 +330,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -402,7 +402,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s1-copy:
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 5, 50 && echo 9, 90 && echo 10, 100''WITH CSV');
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 6, 60 && echo 9, 90 && echo 10, 100''WITH CSV');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
@ -426,7 +426,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -504,7 +504,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...>
step s1-commit:
COMMIT;
@ -587,7 +587,7 @@ run_commands_on_session_level_connection_to_node
(1 row)
step s2-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -640,3 +640,59 @@ restore_isolation_tester_func
(1 row)
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-truncate s3-select-count-from-ref-table s1-commit-worker s1-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s1-truncate:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s3-select-count-from-ref-table:
SELECT COUNT(*) FROM referencing_table_2;
<waiting ...>
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
step s3-select-count-from-ref-table: <... completed>
count
---------------------------------------------------------------------
0
(1 row)
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
---------------------------------------------------------------------
(1 row)
restore_isolation_tester_func
---------------------------------------------------------------------
(1 row)

View File

@ -251,19 +251,13 @@ step s2-truncate:
step s1-commit:
COMMIT;
s2: WARNING: relation "public.dist_table" does not exist
s2: WARNING: relation "public.dist_table" does not exist
step s2-truncate: <... completed>
ERROR: failure on connection marked as essential: localhost:xxxxx
step s2-select:
SELECT * FROM dist_table ORDER BY 1, 2;
a|b
---------------------------------------------------------------------
1|2
3|4
5|6
(3 rows)
(0 rows)
restore_isolation_tester_func
---------------------------------------------------------------------

View File

@ -73,6 +73,7 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name
-- make sure that pg_class queries do not get blocked on table locks
begin;
SET LOCAL citus.enable_ddl_propagation TO OFF;
lock table test_table in access exclusive mode;
prepare transaction 'take-aggressive-lock';
-- shards are hidden when using psql as application_name

View File

@ -94,5 +94,6 @@ test: isolation_metadata_sync_deadlock
test: isolation_replicated_dist_on_mx
test: isolation_replicate_reference_tables_to_coordinator
test: isolation_multiuser_locking
test: isolation_acquire_distributed_locks
test: isolation_check_mx

View File

@ -61,6 +61,7 @@ test: locally_execute_intermediate_results
test: multi_mx_alter_distributed_table
test: update_colocation_mx
test: resync_metadata_with_sequences
test: distributed_locks
# should be executed sequentially because it modifies metadata
test: local_shard_execution_dropped_column

View File

@ -464,6 +464,7 @@ push(@pgOptions, "citus.shard_replication_factor=2");
push(@pgOptions, "citus.node_connection_timeout=${connectionTimeout}");
push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
# Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");

View File

@ -0,0 +1,242 @@
#include "isolation_mx_common.include.spec"
setup
{
SELECT citus_set_coordinator_host('localhost', 57636);
CREATE TABLE dist_table(a int);
CREATE TABLE citus_local_table(a int);
CREATE TABLE local_table(a int);
CREATE TABLE ref_table(a int);
CREATE TABLE partitioned_table(a int)
PARTITION BY RANGE(a);
CREATE TABLE partition_1 PARTITION OF partitioned_table
FOR VALUES FROM (1) TO (11);
CREATE TABLE partition_2 PARTITION OF partitioned_table
FOR VALUES FROM (11) TO (21);
SELECT create_distributed_table('dist_table', 'a');
SELECT create_reference_table('ref_table');
SELECT citus_add_local_table_to_metadata('citus_local_table');
SELECT create_distributed_table('partitioned_table', 'a');
CREATE VIEW sub_view(a) AS
SELECT 2 * a AS a
FROM ref_table;
CREATE VIEW main_view AS
SELECT t1.a a1, t2.a a2, t3.a a3
FROM dist_table t1
JOIN citus_local_table t2 ON t1.a = t2.a
JOIN sub_view t3 ON t2.a = t3.a;
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO citus_local_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO local_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO ref_table SELECT n FROM generate_series(1, 5) n;
INSERT INTO partitioned_table SELECT n FROM generate_series(8, 12) n;
}
teardown
{
DROP VIEW main_view;
DROP VIEW sub_view;
DROP TABLE dist_table;
DROP TABLE citus_local_table;
DROP TABLE local_table;
DROP TABLE ref_table;
DROP TABLE partitioned_table;
SELECT citus_remove_node('localhost', 57636);
SELECT citus_internal.restore_isolation_tester_func();
}
// coordinator session
session "coor"
step "coor-begin"
{
BEGIN;
}
step "coor-acquire-aggresive-lock-on-dist-table"
{
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-dist-table-nowait"
{
LOCK dist_table IN ACCESS EXCLUSIVE MODE NOWAIT;
}
step "coor-acquire-weak-lock-on-dist-table"
{
LOCK dist_table IN ACCESS SHARE MODE;
}
step "coor-acquire-aggresive-lock-on-view"
{
LOCK main_view IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-only-view"
{
LOCK ONLY main_view IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-view-nowait"
{
LOCK main_view IN ACCESS EXCLUSIVE MODE NOWAIT;
}
step "coor-lock-all"
{
LOCK dist_table, citus_local_table, ref_table, main_view, sub_view, local_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-read-dist-table"
{
SELECT COUNT(*) FROM dist_table;
}
step "coor-read-ref-table"
{
SELECT COUNT(*) FROM ref_table;
}
step "coor-acquire-aggresive-lock-on-partitioned-table"
{
LOCK partitioned_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-partitioned-table-with-*-syntax"
{
LOCK partitioned_table * IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-only-partitioned-table"
{
LOCK ONLY partitioned_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-acquire-aggresive-lock-on-ref-table"
{
LOCK ref_table IN ACCESS EXCLUSIVE MODE;
}
step "coor-rollback"
{
ROLLBACK;
}
// worker 1 xact session
session "w1"
step "w1-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57637);
}
step "w1-begin"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "w1-read-dist-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM dist_table');
}
step "w1-read-ref-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM ref_table');
}
step "w1-read-citus-local-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM citus_local_table');
}
step "w1-acquire-aggressive-lock-dist-table" {
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
}
step "w1-lock-reference-table"
{
SELECT run_commands_on_session_level_connection_to_node('LOCK ref_table IN ACCESS EXCLUSIVE MODE');
}
step "w1-read-partitioned-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM partitioned_table');
}
step "w1-read-partition-of-partitioned-table"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM partition_1');
}
step "w1-read-main-view"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT COUNT(*) FROM main_view');
}
step "w1-rollback"
{
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
}
step "w1-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
// worker 2 xact session
session "w2"
step "w2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57638);
}
step "w2-begin"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "w2-acquire-aggressive-lock-dist-table" {
SELECT run_commands_on_session_level_connection_to_node('LOCK dist_table IN ACCESS EXCLUSIVE MODE');
}
step "w2-rollback"
{
SELECT run_commands_on_session_level_connection_to_node('ROLLBACK');
}
step "w2-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
permutation "coor-begin" "coor-acquire-aggresive-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "coor-read-dist-table" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-begin" "coor-acquire-aggresive-lock-on-dist-table-nowait" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w2-start-session-level-connection" "w2-begin" "w1-acquire-aggressive-lock-dist-table" "w2-acquire-aggressive-lock-dist-table" "w1-rollback" "w1-read-dist-table" "w2-rollback" "w1-stop-connection" "w2-stop-connection"
permutation "coor-begin" "coor-acquire-weak-lock-on-dist-table" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w1-lock-reference-table" "coor-begin" "coor-read-ref-table" "w1-rollback" "coor-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-read-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-view" "w1-start-session-level-connection" "w1-begin" "w1-read-ref-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-view" "w1-start-session-level-connection" "w1-begin" "w1-read-ref-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "w1-start-session-level-connection" "w1-begin" "w1-acquire-aggressive-lock-dist-table" "coor-begin" "coor-acquire-aggresive-lock-on-view-nowait" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-lock-all" "w1-start-session-level-connection" "w1-begin" "w1-read-citus-local-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-partitioned-table-with-*-syntax" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-only-partitioned-table" "w1-start-session-level-connection" "w1-begin" "w1-read-partition-of-partitioned-table" "coor-rollback" "w1-rollback" "w1-stop-connection"
permutation "coor-begin" "coor-acquire-aggresive-lock-on-ref-table" "w1-start-session-level-connection" "w1-begin" "w1-read-main-view" "coor-rollback" "w1-rollback" "w1-stop-connection"

View File

@ -2,17 +2,29 @@
setup
{
CREATE TABLE truncate_table(id integer, value integer);
CREATE TABLE truncate_table(id integer, value integer, PRIMARY KEY(id));
CREATE TABLE data_table(id integer, value integer);
CREATE TABLE referencing_table_1 (id integer, PRIMARY KEY(id), FOREIGN KEY (id) REFERENCES truncate_table(id));
CREATE TABLE referencing_table_2 (id integer, PRIMARY KEY(id), FOREIGN KEY (id) REFERENCES referencing_table_1(id));
SELECT create_distributed_table('truncate_table', 'id');
SELECT create_distributed_table('data_table', 'id');
SELECT create_distributed_table('referencing_table_1', 'id');
SELECT create_distributed_table('referencing_table_2', 'id');
COPY truncate_table FROM PROGRAM 'echo 1, 10 && echo 2, 20 && echo 3, 30 && echo 4, 40 && echo 5, 50' WITH CSV;
COPY data_table FROM PROGRAM 'echo 20, 20 && echo 30, 30 && echo 40, 40 && echo 50, 50' WITH CSV;
}
// Create and use UDF to close the connection opened in the setup step. Also return the cluster
// back to the initial state.
teardown
{
DROP TABLE IF EXISTS truncate_table CASCADE;
SELECT citus_internal.restore_isolation_tester_func();
DROP TABLE IF EXISTS data_table;
DROP TABLE IF EXISTS referencing_table_2;
DROP TABLE IF EXISTS referencing_table_1;
DROP TABLE IF EXISTS truncate_table CASCADE;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"
@ -36,7 +48,7 @@ step "s1-begin-on-worker"
step "s1-truncate"
{
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
}
step "s1-select"
@ -46,7 +58,7 @@ step "s1-select"
step "s1-insert-select"
{
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM truncate_table');
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO truncate_table SELECT * FROM data_table');
}
step "s1-delete"
@ -56,7 +68,7 @@ step "s1-delete"
step "s1-copy"
{
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 5, 50 && echo 9, 90 && echo 10, 100''WITH CSV');
SELECT run_commands_on_session_level_connection_to_node('COPY truncate_table FROM PROGRAM ''echo 6, 60 && echo 9, 90 && echo 10, 100''WITH CSV');
}
step "s1-alter"
@ -101,7 +113,7 @@ step "s2-begin-on-worker"
step "s2-truncate"
{
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table');
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE truncate_table CASCADE');
}
step "s2-commit-worker"
@ -122,6 +134,11 @@ step "s3-select-count"
SELECT COUNT(*) FROM truncate_table;
}
step "s3-select-count-from-ref-table"
{
SELECT COUNT(*) FROM referencing_table_2;
}
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-truncate" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
@ -131,3 +148,4 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
permutation "s1-begin" "s1-alter" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit" "s2-commit-worker" "s2-stop-connection" "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-truncate" "s3-select-count-from-ref-table" "s1-commit-worker" "s1-stop-connection"

View File

@ -0,0 +1,149 @@
CREATE SCHEMA distribute_lock_tests;
SET search_path TO distribute_lock_tests;
SET citus.next_shard_id TO 10000;
CREATE TABLE dist_table(a int);
SELECT create_distributed_table('dist_table', 'a');
INSERT INTO dist_table SELECT n FROM generate_series(1, 5) n;
-- Test acquiring lock outside transaction
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
-- Test acquiring lock inside procedure
DO $$
BEGIN
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
END;
$$;
-- Try all valid lock options; also try omitting the optional TABLE keyword.
BEGIN TRANSACTION;
LOCK TABLE dist_table IN ACCESS SHARE MODE;
LOCK dist_table IN ROW SHARE MODE;
LOCK TABLE dist_table IN ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE UPDATE EXCLUSIVE MODE;
LOCK TABLE dist_table IN SHARE MODE;
LOCK dist_table IN SHARE ROW EXCLUSIVE MODE;
LOCK TABLE dist_table IN EXCLUSIVE MODE;
LOCK TABLE dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test that when the user does not have the required permissions to lock
-- the locks are not forwarded to the workers
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
RESET citus.enable_ddl_propagation;
$$);
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE read_only_user WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO read_only_user;
GRANT SELECT ON dist_table TO read_only_user;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE read_only_user;
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
SET citus.log_remote_commands TO OFF;
RESET ROLE;
-- test that user with view permissions can lock the tables
-- which the view is built on
CREATE VIEW myview AS SELECT * FROM dist_table;
SET client_min_messages TO ERROR;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON distribute_lock_tests.myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);
SET citus.enable_ddl_propagation TO OFF;
CREATE ROLE user_with_view_permissions WITH LOGIN;
GRANT ALL ON SCHEMA distribute_lock_tests TO user_with_view_permissions;
GRANT ALL ON myview TO user_with_view_permissions;
RESET citus.enable_ddl_propagation;
RESET client_min_messages;
SET ROLE TO user_with_view_permissions;
BEGIN;
LOCK myview IN ACCESS EXCLUSIVE MODE;
SELECT run_command_on_workers($$
SELECT mode FROM pg_locks WHERE relation = 'distribute_lock_tests.dist_table'::regclass;
$$);
ROLLBACK;
RESET ROLE;
\c - - - :worker_1_port
SET search_path TO distribute_lock_tests;
-- Test trying to lock from a worker when the coordinator is not in the metadata
SET citus.allow_unsafe_locks_from_workers TO 'off';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Verify that the same restriction does not apply to worker local tables
CREATE TABLE local_table(a int);
-- Verify that no locks will be distributed for the local lock
SET citus.log_remote_commands TO ON;
BEGIN;
LOCK local_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.log_remote_commands;
-- Cleanup local table
DROP TABLE local_table;
-- Test that setting the guc to 'on' will allow the lock from workers
SET citus.allow_unsafe_locks_from_workers TO 'on';
BEGIN;
LOCK dist_table IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test locking a shard
SET citus.enable_manual_changes_to_shards TO OFF;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
-- Test allowing shard locks with the citus.enable_manual_changes_to_shards guc
SET citus.enable_manual_changes_to_shards TO ON;
BEGIN;
LOCK dist_table_10000 IN ACCESS EXCLUSIVE MODE;
ROLLBACK;
RESET citus.enable_manual_changes_to_shards;
\c - - - :master_port
DROP SCHEMA distribute_lock_tests CASCADE;
SET citus.enable_ddl_propagation TO OFF;
DROP ROLE read_only_user;
DROP ROLE user_with_view_permissions;
RESET citus.enable_ddl_propagation;
SELECT run_command_on_workers($$
SET citus.enable_ddl_propagation TO OFF;
DROP USER read_only_user;
DROP USER user_with_view_permissions;
RESET citus.enable_ddl_propagation;
$$);

View File

@ -43,7 +43,9 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name
-- make sure that pg_class queries do not get blocked on table locks
begin;
SET LOCAL citus.enable_ddl_propagation TO OFF;
lock table test_table in access exclusive mode;
prepare transaction 'take-aggressive-lock';
-- shards are hidden when using psql as application_name