Support role commands from any node (#7278)

DESCRIPTION: Adds support from issuing role management commands from worker nodes

It's unlikely to get into a distributed deadlock with role commands, we
don't care much about them at the moment.
There were several attempts to reduce the chances of a deadlock but we
didn't any of them merged into main branch yet, see:
#7325
#7016
#7009
pull/7325/head
Onur Tirtir 2023-11-10 12:58:51 +03:00 committed by GitHub
parent 57ff762c82
commit 240313e286
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 758 additions and 168 deletions

View File

@ -40,14 +40,14 @@ static char * DropTableIfExistsCommand(Oid relationId);
/* /*
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes * EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes
* sure these are available on all workers. If not available they will be created on the * sure these are available on all nodes. If not available they will be created on the
* workers via a separate session that will be committed directly so that the objects are * nodes via a separate session that will be committed directly so that the objects are
* visible to potentially multiple sessions creating the shards. * visible to potentially multiple sessions creating the shards.
* *
* Note; only the actual objects are created via a separate session, the records to * Note; only the actual objects are created via a separate session, the records to
* pg_dist_object are created in this session. As a side effect the objects could be * pg_dist_object are created in this session. As a side effect the objects could be
* created on the workers without a catalog entry. Updates to the objects on the coordinator * created on the nodes without a catalog entry. Updates to the objects on local node
* are not propagated to the workers until the record is visible on the coordinator. * are not propagated to the remote nodes until the record is visible on local node.
* *
* This is solved by creating the dependencies in an idempotent manner, either via * This is solved by creating the dependencies in an idempotent manner, either via
* postgres native CREATE IF NOT EXISTS, or citus helper functions. * postgres native CREATE IF NOT EXISTS, or citus helper functions.
@ -95,7 +95,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* either get it now, or get it in citus_add_node after this transaction finishes and * either get it now, or get it in citus_add_node after this transaction finishes and
* the pg_dist_object record becomes visible. * the pg_dist_object record becomes visible.
*/ */
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock); List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock);
/* /*
* Lock dependent objects explicitly to make sure same DDL command won't be sent * Lock dependent objects explicitly to make sure same DDL command won't be sent
@ -127,12 +127,12 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
*/ */
if (HasAnyDependencyInPropagatedObjects(target)) if (HasAnyDependencyInPropagatedObjects(target))
{ {
SendCommandListToWorkersWithMetadata(ddlCommands); SendCommandListToRemoteNodesWithMetadata(ddlCommands);
} }
else else
{ {
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, remoteNodeList)
{ {
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
@ -144,8 +144,8 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
} }
/* /*
* We do this after creating the objects on the workers, we make sure * We do this after creating the objects on remote nodes, we make sure
* that objects have been created on worker nodes before marking them * that objects have been created on remote nodes before marking them
* distributed, so MarkObjectDistributed wouldn't fail. * distributed, so MarkObjectDistributed wouldn't fail.
*/ */
foreach_ptr(dependency, dependenciesWithCommands) foreach_ptr(dependency, dependenciesWithCommands)

View File

@ -180,6 +180,8 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
return NIL; return NIL;
} }
EnsureCoordinator();
if (createIndexStatement->idxname == NULL) if (createIndexStatement->idxname == NULL)
{ {
/* /*

View File

@ -156,7 +156,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
return NIL; return NIL;
} }
EnsureCoordinator(); EnsurePropagationToCoordinator();
AlterRoleStmt *stmt = castNode(AlterRoleStmt, node); AlterRoleStmt *stmt = castNode(AlterRoleStmt, node);
@ -185,7 +185,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
(void *) CreateAlterRoleIfExistsCommand(stmt), (void *) CreateAlterRoleIfExistsCommand(stmt),
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -231,7 +231,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString,
return NIL; return NIL;
} }
EnsureCoordinator(); EnsurePropagationToCoordinator();
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
const char *sql = DeparseTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt);
@ -240,7 +240,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commandList); return NodeDDLTaskList(REMOTE_NODES, commandList);
} }
@ -910,7 +910,8 @@ PreprocessCreateRoleStmt(Node *node, const char *queryString,
return NIL; return NIL;
} }
EnsureCoordinator(); EnsurePropagationToCoordinator();
EnsureSequentialModeForRoleDDL(); EnsureSequentialModeForRoleDDL();
LockRelationOid(DistNodeRelationId(), RowShareLock); LockRelationOid(DistNodeRelationId(), RowShareLock);
@ -945,7 +946,7 @@ PreprocessCreateRoleStmt(Node *node, const char *queryString,
commands = lappend(commands, ENABLE_DDL_PROPAGATION); commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -1041,7 +1042,8 @@ PreprocessDropRoleStmt(Node *node, const char *queryString,
return NIL; return NIL;
} }
EnsureCoordinator(); EnsurePropagationToCoordinator();
EnsureSequentialModeForRoleDDL(); EnsureSequentialModeForRoleDDL();
@ -1053,7 +1055,7 @@ PreprocessDropRoleStmt(Node *node, const char *queryString,
sql, sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -1130,7 +1132,7 @@ PreprocessGrantRoleStmt(Node *node, const char *queryString,
return NIL; return NIL;
} }
EnsureCoordinator(); EnsurePropagationToCoordinator();
GrantRoleStmt *stmt = castNode(GrantRoleStmt, node); GrantRoleStmt *stmt = castNode(GrantRoleStmt, node);
List *allGranteeRoles = stmt->grantee_roles; List *allGranteeRoles = stmt->grantee_roles;
@ -1170,7 +1172,7 @@ PreprocessGrantRoleStmt(Node *node, const char *queryString,
sql, sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -1181,11 +1183,13 @@ PreprocessGrantRoleStmt(Node *node, const char *queryString,
List * List *
PostprocessGrantRoleStmt(Node *node, const char *queryString) PostprocessGrantRoleStmt(Node *node, const char *queryString)
{ {
if (!EnableCreateRolePropagation || !IsCoordinator() || !ShouldPropagate()) if (!EnableCreateRolePropagation || !ShouldPropagate())
{ {
return NIL; return NIL;
} }
EnsurePropagationToCoordinator();
GrantRoleStmt *stmt = castNode(GrantRoleStmt, node); GrantRoleStmt *stmt = castNode(GrantRoleStmt, node);
RoleSpec *role = NULL; RoleSpec *role = NULL;
@ -1333,7 +1337,7 @@ PreprocessAlterRoleRenameStmt(Node *node, const char *queryString,
Assert(stmt->renameType == OBJECT_ROLE); Assert(stmt->renameType == OBJECT_ROLE);
EnsureCoordinator(); EnsurePropagationToCoordinator();
char *sql = DeparseTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt);
@ -1341,7 +1345,7 @@ PreprocessAlterRoleRenameStmt(Node *node, const char *queryString,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }

View File

@ -708,9 +708,9 @@ citus_ProcessUtilityInternal(PlannedStmt *pstmt,
} }
else if (IsA(parsetree, CreateRoleStmt) && !EnableCreateRolePropagation) else if (IsA(parsetree, CreateRoleStmt) && !EnableCreateRolePropagation)
{ {
ereport(NOTICE, (errmsg("not propagating CREATE ROLE/USER commands to worker" ereport(NOTICE, (errmsg("not propagating CREATE ROLE/USER commands to other"
" nodes"), " nodes"),
errhint("Connect to worker nodes directly to manually create all" errhint("Connect to other nodes directly to manually create all"
" necessary users and roles."))); " necessary users and roles.")));
} }
@ -1106,16 +1106,17 @@ IsDropSchemaOrDB(Node *parsetree)
* each shard placement and COMMIT/ROLLBACK is handled by * each shard placement and COMMIT/ROLLBACK is handled by
* CoordinatedTransactionCallback function. * CoordinatedTransactionCallback function.
* *
* The function errors out if the node is not the coordinator or if the DDL is on * The function errors out if the DDL is on a partitioned table which has replication
* a partitioned table which has replication factor > 1. * factor > 1, or if the the coordinator is not added into metadata and we're on a
* * worker node because we want to make sure that distributed DDL jobs are executed
* on the coordinator node too. See EnsurePropagationToCoordinator() for more details.
*/ */
void void
ExecuteDistributedDDLJob(DDLJob *ddlJob) ExecuteDistributedDDLJob(DDLJob *ddlJob)
{ {
bool shouldSyncMetadata = false; bool shouldSyncMetadata = false;
EnsureCoordinator(); EnsurePropagationToCoordinator();
ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress; ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress;
@ -1139,23 +1140,24 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
{ {
if (shouldSyncMetadata) if (shouldSyncMetadata)
{ {
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION);
char *currentSearchPath = CurrentSearchPath(); char *currentSearchPath = CurrentSearchPath();
/* /*
* Given that we're relaying the query to the worker nodes directly, * Given that we're relaying the query to the remote nodes directly,
* we should set the search path exactly the same when necessary. * we should set the search path exactly the same when necessary.
*/ */
if (currentSearchPath != NULL) if (currentSearchPath != NULL)
{ {
SendCommandToWorkersWithMetadata( SendCommandToRemoteNodesWithMetadata(
psprintf("SET LOCAL search_path TO %s;", currentSearchPath)); psprintf("SET LOCAL search_path TO %s;", currentSearchPath));
} }
if (ddlJob->metadataSyncCommand != NULL) if (ddlJob->metadataSyncCommand != NULL)
{ {
SendCommandToWorkersWithMetadata((char *) ddlJob->metadataSyncCommand); SendCommandToRemoteNodesWithMetadata(
(char *) ddlJob->metadataSyncCommand);
} }
} }
@ -1234,7 +1236,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
char *currentSearchPath = CurrentSearchPath(); char *currentSearchPath = CurrentSearchPath();
/* /*
* Given that we're relaying the query to the worker nodes directly, * Given that we're relaying the query to the remote nodes directly,
* we should set the search path exactly the same when necessary. * we should set the search path exactly the same when necessary.
*/ */
if (currentSearchPath != NULL) if (currentSearchPath != NULL)
@ -1246,7 +1248,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
commandList = lappend(commandList, (char *) ddlJob->metadataSyncCommand); commandList = lappend(commandList, (char *) ddlJob->metadataSyncCommand);
SendBareCommandListToMetadataWorkers(commandList); SendBareCommandListToRemoteMetadataNodes(commandList);
} }
} }
PG_CATCH(); PG_CATCH();

View File

@ -302,8 +302,8 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* seem to cause any problems as none of the placements that we are * seem to cause any problems as none of the placements that we are
* going to access would be on the new node. * going to access would be on the new node.
*/ */
List *primaryNodeList = ActivePrimaryRemoteNodeList(NoLock); List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock);
EnsureConnectionPossibilityForNodeList(primaryNodeList); EnsureConnectionPossibilityForNodeList(remoteNodeList);
} }

View File

@ -149,7 +149,7 @@ ObjectExists(const ObjectAddress *address)
/* /*
* MarkObjectDistributed marks an object as a distributed object. Marking is done * MarkObjectDistributed marks an object as a distributed object. Marking is done
* by adding appropriate entries to citus.pg_dist_object and also marking the object * by adding appropriate entries to citus.pg_dist_object and also marking the object
* as distributed by opening a connection using current user to all of the workers * as distributed by opening a connection using current user to all remote nodes
* with metadata if object propagation is on. * with metadata if object propagation is on.
* *
* This function should be used if the user creating the given object. If you want * This function should be used if the user creating the given object. If you want
@ -164,7 +164,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
{ {
char *workerPgDistObjectUpdateCommand = char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress); CreatePgDistObjectEntryCommand(distAddress);
SendCommandToWorkersWithMetadata(workerPgDistObjectUpdateCommand); SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
} }
} }
@ -172,7 +172,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
/* /*
* MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking * MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking
* is done by adding appropriate entries to citus.pg_dist_object and also marking the * is done by adding appropriate entries to citus.pg_dist_object and also marking the
* object as distributed by opening a connection using super user to all of the workers * object as distributed by opening a connection using super user to all remote nodes
* with metadata if object propagation is on. * with metadata if object propagation is on.
* *
* This function should be used to mark dependent object as distributed. If you want * This function should be used to mark dependent object as distributed. If you want
@ -187,7 +187,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
{ {
char *workerPgDistObjectUpdateCommand = char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress); CreatePgDistObjectEntryCommand(distAddress);
SendCommandToWorkersWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
} }
} }

View File

@ -134,7 +134,7 @@ static bool ShouldSkipMetadataChecks(void);
static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod,
int colocationId, char replicationModel, int colocationId, char replicationModel,
Var *distributionKey); Var *distributionKey);
static void EnsureCoordinatorInitiatedOperation(void); static void EnsureCitusInitiatedOperation(void);
static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType,
text *shardMinValue, text *shardMinValue,
text *shardMaxValue); text *shardMaxValue);
@ -1001,7 +1001,7 @@ citus_internal_add_object_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed for executing as a separate command */ /* this UDF is not allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
/* /*
* Ensure given distributionArgumentIndex and colocationId values are * Ensure given distributionArgumentIndex and colocationId values are
@ -3090,7 +3090,7 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
if (distributionMethod == DISTRIBUTE_BY_NONE && distributionColumnVar != NULL) if (distributionMethod == DISTRIBUTE_BY_NONE && distributionColumnVar != NULL)
{ {
@ -3206,7 +3206,7 @@ citus_internal_delete_partition_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
} }
DeletePartitionRow(relationId); DeletePartitionRow(relationId);
@ -3254,7 +3254,7 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
/* /*
* Even if the table owner is a malicious user and the shard metadata is * Even if the table owner is a malicious user and the shard metadata is
@ -3272,19 +3272,13 @@ citus_internal_add_shard_metadata(PG_FUNCTION_ARGS)
/* /*
* EnsureCoordinatorInitiatedOperation is a helper function which ensures that * EnsureCitusInitiatedOperation is a helper function which ensures that
* the execution is initiated by the coordinator on a worker node. * the execution is initiated by Citus.
*/ */
static void static void
EnsureCoordinatorInitiatedOperation(void) EnsureCitusInitiatedOperation(void)
{ {
/* if (!(IsCitusInternalBackend() || IsRebalancerInternalBackend()))
* We are restricting the operation to only MX workers with the local group id
* check. The other two checks are to ensure that the operation is initiated
* by the coordinator.
*/
if (!(IsCitusInternalBackend() || IsRebalancerInternalBackend()) ||
GetLocalGroupId() == COORDINATOR_GROUP_ID)
{ {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("This is an internal Citus function can only be " errmsg("This is an internal Citus function can only be "
@ -3465,7 +3459,7 @@ citus_internal_delete_placement_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
} }
DeleteShardPlacementRow(placementId); DeleteShardPlacementRow(placementId);
@ -3513,7 +3507,7 @@ citus_internal_add_placement_metadata_internal(int64 shardId, int64 shardLength,
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
/* /*
* Even if the table owner is a malicious user, as long as the shard placements * Even if the table owner is a malicious user, as long as the shard placements
@ -3608,7 +3602,7 @@ citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
if (!ShardExists(shardId)) if (!ShardExists(shardId))
{ {
@ -3672,7 +3666,7 @@ citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
if (!ShardExists(shardId)) if (!ShardExists(shardId))
{ {
@ -3715,7 +3709,7 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
/* ensure that the table is in pg_dist_partition */ /* ensure that the table is in pg_dist_partition */
char partitionMethod = PartitionMethodViaCatalog(relationId); char partitionMethod = PartitionMethodViaCatalog(relationId);
@ -3781,7 +3775,7 @@ citus_internal_add_colocation_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
} }
InsertColocationGroupLocally(colocationId, shardCount, replicationFactor, InsertColocationGroupLocally(colocationId, shardCount, replicationFactor,
@ -3806,7 +3800,7 @@ citus_internal_delete_colocation_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
/* this UDF is not allowed allowed for executing as a separate command */ /* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
} }
DeleteColocationGroupLocally(colocationId); DeleteColocationGroupLocally(colocationId);
@ -3885,7 +3879,7 @@ citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
if (!ShouldSkipMetadataChecks()) if (!ShouldSkipMetadataChecks())
{ {
EnsureCoordinatorInitiatedOperation(); EnsureCitusInitiatedOperation();
} }
UpdateNoneDistTableMetadata(relationId, replicationModel, UpdateNoneDistTableMetadata(relationId, replicationModel,

View File

@ -2742,6 +2742,25 @@ EnsureCoordinator(void)
} }
/*
* EnsurePropagationToCoordinator checks whether the coordinator is added to the
* metadata if we're not on the coordinator.
*
* Given that metadata syncing skips syncing metadata to the coordinator, we need
* too make sure that the coordinator is added to the metadata before propagating
* a command from a worker. For this reason, today we use this only for the commands
* that we support propagating from workers.
*/
void
EnsurePropagationToCoordinator(void)
{
if (!IsCoordinator())
{
EnsureCoordinatorIsInMetadata();
}
}
/* /*
* EnsureCoordinatorIsInMetadata checks whether the coordinator is added to the * EnsureCoordinatorIsInMetadata checks whether the coordinator is added to the
* metadata, which is required for many operations. * metadata, which is required for many operations.

View File

@ -180,7 +180,7 @@ ActivePrimaryNodeList(LOCKMODE lockMode)
/* /*
* ActivePrimaryRemoteNodeList returns a list of all active primary nodes in * ActivePrimaryRemoteNodeList returns a list of all active primary nodes in
* workerNodeHash. * workerNodeHash except the local one.
*/ */
List * List *
ActivePrimaryRemoteNodeList(LOCKMODE lockMode) ActivePrimaryRemoteNodeList(LOCKMODE lockMode)

View File

@ -34,6 +34,12 @@
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/builtins.h" #include "utils/builtins.h"
static void SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues);
static void SendBareCommandListToMetadataNodesInternal(List *commandList,
TargetWorkerSet targetWorkerSet);
static void SendCommandToMetadataWorkersParams(const char *command, static void SendCommandToMetadataWorkersParams(const char *command,
const char *user, int parameterCount, const char *user, int parameterCount,
const Oid *parameterTypes, const Oid *parameterTypes,
@ -150,6 +156,74 @@ SendCommandListToWorkersWithMetadata(List *commands)
} }
/*
* SendCommandToRemoteNodesWithMetadata sends a command to remote nodes in
* parallel. Commands are committed on the nodes when the local transaction
* commits.
*/
void
SendCommandToRemoteNodesWithMetadata(const char *command)
{
SendCommandToRemoteMetadataNodesParams(command, CurrentUserName(),
0, NULL, NULL);
}
/*
* SendCommandToRemoteNodesWithMetadataViaSuperUser sends a command to remote
* nodes in parallel by opening a super user connection. Commands are committed
* on the nodes when the local transaction commits. The connection are made as
* the extension owner to ensure write access to the Citus metadata tables.
*
* Since we prevent to open superuser connections for metadata tables, it is
* discouraged to use it. Consider using it only for propagating pg_dist_object
* tuples for dependent objects.
*/
void
SendCommandToRemoteNodesWithMetadataViaSuperUser(const char *command)
{
SendCommandToRemoteMetadataNodesParams(command, CitusExtensionOwnerName(),
0, NULL, NULL);
}
/*
* SendCommandListToRemoteNodesWithMetadata sends all commands to remote nodes
* with the current user. See `SendCommandToRemoteNodesWithMetadata`for details.
*/
void
SendCommandListToRemoteNodesWithMetadata(List *commands)
{
char *command = NULL;
foreach_ptr(command, commands)
{
SendCommandToRemoteNodesWithMetadata(command);
}
}
/*
* SendCommandToRemoteMetadataNodesParams is a wrapper around
* SendCommandToWorkersParamsInternal() that can be used to send commands
* to remote metadata nodes.
*/
static void
SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount,
const Oid *parameterTypes,
const char *const *parameterValues)
{
/* use METADATA_NODES so that ErrorIfAnyMetadataNodeOutOfSync checks local node as well */
List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES,
RowShareLock);
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
SendCommandToWorkersParamsInternal(REMOTE_METADATA_NODES, command, user,
parameterCount, parameterTypes, parameterValues);
}
/* /*
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
* TargetWorkerSet. * TargetWorkerSet.
@ -162,17 +236,29 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{ {
workerNodeList = ActivePrimaryNodeList(lockMode); workerNodeList = ActivePrimaryNodeList(lockMode);
} }
else else if (targetWorkerSet == REMOTE_NODES || targetWorkerSet == REMOTE_METADATA_NODES)
{
workerNodeList = ActivePrimaryRemoteNodeList(lockMode);
}
else if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES ||
targetWorkerSet == NON_COORDINATOR_NODES)
{ {
workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode); workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
} }
else
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid target worker set: %d", targetWorkerSet)));
}
List *result = NIL; List *result = NIL;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
{ {
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet == if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES ||
METADATA_NODES) && targetWorkerSet == REMOTE_METADATA_NODES ||
targetWorkerSet == METADATA_NODES) &&
!workerNode->hasMetadata) !workerNode->hasMetadata)
{ {
continue; continue;
@ -186,16 +272,42 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
/* /*
* SendBareCommandListToMetadataWorkers sends a list of commands to metadata * SendBareCommandListToRemoteMetadataNodes is a wrapper around
* workers in serial. Commands are committed immediately: new connections are * SendBareCommandListToMetadataNodesInternal() that can be used to send
* always used and no transaction block is used (hence "bare"). The connections * bare commands to remote metadata nodes.
* are made as the extension owner to ensure write access to the Citus metadata */
* tables. Primarly useful for INDEX commands using CONCURRENTLY. void
SendBareCommandListToRemoteMetadataNodes(List *commandList)
{
SendBareCommandListToMetadataNodesInternal(commandList,
REMOTE_METADATA_NODES);
}
/*
* SendBareCommandListToMetadataWorkers is a wrapper around
* SendBareCommandListToMetadataNodesInternal() that can be used to send
* bare commands to metadata workers.
*/ */
void void
SendBareCommandListToMetadataWorkers(List *commandList) SendBareCommandListToMetadataWorkers(List *commandList)
{ {
TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES; SendBareCommandListToMetadataNodesInternal(commandList,
NON_COORDINATOR_METADATA_NODES);
}
/*
* SendBareCommandListToMetadataNodesInternal sends a list of commands to given
* target worker set in serial. Commands are committed immediately: new connections
* are always used and no transaction block is used (hence "bare"). The connections
* are made as the extension owner to ensure write access to the Citus metadata
* tables. Primarly useful for INDEX commands using CONCURRENTLY.
*/
static void
SendBareCommandListToMetadataNodesInternal(List *commandList,
TargetWorkerSet targetWorkerSet)
{
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock); List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock);
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();

View File

@ -87,6 +87,7 @@ extern WorkerNode * FindNodeWithNodeId(int nodeId, bool missingOk);
extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); extern WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
extern List * ReadDistNode(bool includeNodesFromOtherClusters); extern List * ReadDistNode(bool includeNodesFromOtherClusters);
extern void EnsureCoordinator(void); extern void EnsureCoordinator(void);
extern void EnsurePropagationToCoordinator(void);
extern void EnsureCoordinatorIsInMetadata(void); extern void EnsureCoordinatorIsInMetadata(void);
extern void InsertCoordinatorIfClusterEmpty(void); extern void InsertCoordinatorIfClusterEmpty(void);
extern uint32 GroupForNode(char *nodeName, int32 nodePort); extern uint32 GroupForNode(char *nodeName, int32 nodePort);

View File

@ -29,11 +29,22 @@ typedef enum TargetWorkerSet
*/ */
NON_COORDINATOR_METADATA_NODES, NON_COORDINATOR_METADATA_NODES,
/*
* All the active primary nodes in the metadata which have metadata
* except the local node
*/
REMOTE_METADATA_NODES,
/* /*
* All the active primary nodes in the metadata except the coordinator * All the active primary nodes in the metadata except the coordinator
*/ */
NON_COORDINATOR_NODES, NON_COORDINATOR_NODES,
/*
* All the active primary nodes in the metadata except the local node
*/
REMOTE_NODES,
/* /*
* All active primary nodes in the metadata * All active primary nodes in the metadata
*/ */
@ -74,6 +85,10 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command); extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToWorkersWithMetadata(List *commands); extern void SendCommandListToWorkersWithMetadata(List *commands);
extern void SendCommandToRemoteNodesWithMetadata(const char *command);
extern void SendCommandToRemoteNodesWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToRemoteNodesWithMetadata(List *commands);
extern void SendBareCommandListToRemoteMetadataNodes(List *commandList);
extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void); extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,

View File

@ -40,18 +40,10 @@ SELECT master_remove_node('localhost', :worker_2_port);
CREATE ROLE create_role_with_everything SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 105 PASSWORD 'strong_password123^' VALID UNTIL '2045-05-05 00:00:00.00+00' IN ROLE create_role, create_group ROLE create_user, create_group_2 ADMIN create_role_2, create_user_2; CREATE ROLE create_role_with_everything SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 105 PASSWORD 'strong_password123^' VALID UNTIL '2045-05-05 00:00:00.00+00' IN ROLE create_role, create_group ROLE create_user, create_group_2 ADMIN create_role_2, create_user_2;
CREATE ROLE create_role_with_nothing NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT 3 PASSWORD 'weakpassword' VALID UNTIL '2015-05-05 00:00:00.00+00'; CREATE ROLE create_role_with_nothing NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT 3 PASSWORD 'weakpassword' VALID UNTIL '2015-05-05 00:00:00.00+00';
-- show that creating role from worker node is only allowed when create role -- show that creating role from worker node is allowed
-- propagation is off
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE ROLE role_on_worker; CREATE ROLE role_on_worker;
ERROR: operation is not allowed on this node DROP ROLE role_on_worker;
HINT: Connect to the coordinator and run it again.
BEGIN;
SET citus.enable_create_role_propagation TO off;
CREATE ROLE role_on_worker;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
ROLLBACK;
\c - - - :master_port \c - - - :master_port
-- edge case role names -- edge case role names
CREATE ROLE "create_role'edge"; CREATE ROLE "create_role'edge";
@ -217,17 +209,17 @@ CREATE ROLE dist_role_3;
CREATE ROLE dist_role_4; CREATE ROLE dist_role_4;
SET citus.enable_create_role_propagation TO OFF; SET citus.enable_create_role_propagation TO OFF;
CREATE ROLE non_dist_role_1 SUPERUSER; CREATE ROLE non_dist_role_1 SUPERUSER;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
CREATE ROLE non_dist_role_2; CREATE ROLE non_dist_role_2;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
CREATE ROLE non_dist_role_3; CREATE ROLE non_dist_role_3;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
CREATE ROLE non_dist_role_4; CREATE ROLE non_dist_role_4;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
SET citus.enable_create_role_propagation TO ON; SET citus.enable_create_role_propagation TO ON;
SET ROLE dist_role_1; SET ROLE dist_role_1;
GRANT non_dist_role_1 TO non_dist_role_2; GRANT non_dist_role_1 TO non_dist_role_2;
@ -307,11 +299,11 @@ CREATE ROLE dist_mixed_3;
CREATE ROLE dist_mixed_4; CREATE ROLE dist_mixed_4;
SET citus.enable_create_role_propagation TO OFF; SET citus.enable_create_role_propagation TO OFF;
CREATE ROLE nondist_mixed_1; CREATE ROLE nondist_mixed_1;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
CREATE ROLE nondist_mixed_2; CREATE ROLE nondist_mixed_2;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
SELECT roleid::regrole::text AS role, member::regrole::text, grantor::regrole::text, admin_option FROM pg_auth_members WHERE roleid::regrole::text LIKE '%dist\_mixed%' ORDER BY 1, 2; SELECT roleid::regrole::text AS role, member::regrole::text, grantor::regrole::text, admin_option FROM pg_auth_members WHERE roleid::regrole::text LIKE '%dist\_mixed%' ORDER BY 1, 2;
role | member | grantor | admin_option role | member | grantor | admin_option
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -506,14 +498,14 @@ SELECT rolname, rolcanlogin FROM pg_authid WHERE rolname = 'create_role' OR roln
-- test cascading grants -- test cascading grants
SET citus.enable_create_role_propagation TO OFF; SET citus.enable_create_role_propagation TO OFF;
CREATE ROLE nondist_cascade_1; CREATE ROLE nondist_cascade_1;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
CREATE ROLE nondist_cascade_2; CREATE ROLE nondist_cascade_2;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
CREATE ROLE nondist_cascade_3; CREATE ROLE nondist_cascade_3;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
SET citus.enable_create_role_propagation TO ON; SET citus.enable_create_role_propagation TO ON;
CREATE ROLE dist_cascade; CREATE ROLE dist_cascade;
GRANT nondist_cascade_1 TO nondist_cascade_2; GRANT nondist_cascade_1 TO nondist_cascade_2;

View File

@ -680,16 +680,7 @@ SELECT * FROM use_age_invalid ORDER BY 1;
-- verify we can validate a constraint that is already validated, can happen when we add a node while a domain constraint was not validated -- verify we can validate a constraint that is already validated, can happen when we add a node while a domain constraint was not validated
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive; ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
-- test changing the owner of a domain -- test changing the owner of a domain
SET client_min_messages TO error;
SELECT 1 FROM run_command_on_workers($$ CREATE ROLE domain_owner; $$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
CREATE ROLE domain_owner; CREATE ROLE domain_owner;
RESET client_min_messages;
CREATE DOMAIN alter_domain_owner AS int; CREATE DOMAIN alter_domain_owner AS int;
ALTER DOMAIN alter_domain_owner OWNER TO domain_owner; ALTER DOMAIN alter_domain_owner OWNER TO domain_owner;
SELECT u.rolname SELECT u.rolname

View File

@ -28,8 +28,8 @@ DROP USER issue_5763_3;
-- test non-distributed role -- test non-distributed role
SET citus.enable_create_role_propagation TO off; SET citus.enable_create_role_propagation TO off;
CREATE USER issue_5763_4 WITH SUPERUSER; CREATE USER issue_5763_4 WITH SUPERUSER;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
\c - issue_5763_4 - :master_port \c - issue_5763_4 - :master_port
set citus.enable_ddl_propagation = off; set citus.enable_ddl_propagation = off;
CREATE SCHEMA issue_5763_sc_4; CREATE SCHEMA issue_5763_sc_4;

View File

@ -27,8 +27,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal Citus function can only be used in a distributed transaction ERROR: This is an internal Citus function can only be used in a distributed transaction
ROLLBACK; ROLLBACK;
-- in a distributed transaction and the application name is Citus -- in a distributed transaction and the application name is Citus, allowed.
-- but we are on the coordinator, so still not allowed
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
assign_distributed_transaction_id assign_distributed_transaction_id
@ -38,7 +37,11 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001'; SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal Citus function can only be used in a distributed transaction citus_internal_add_partition_metadata
---------------------------------------------------------------------
(1 row)
ROLLBACK; ROLLBACK;
\c - postgres - \c - postgres -
\c - - - :worker_1_port \c - - - :worker_1_port

View File

@ -70,38 +70,43 @@ SELECT create_reference_table('ref');
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
-- alter role from mx worker isn't allowed when alter role propagation is on -- to alter role locally, disable alter role propagation first
SET citus.enable_alter_role_propagation TO ON;
ALTER ROLE reprefuser WITH CREATEROLE;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
-- to alter role locally disable alter role propagation first
SET citus.enable_alter_role_propagation TO OFF; SET citus.enable_alter_role_propagation TO OFF;
ALTER ROLE reprefuser WITH CREATEROLE; ALTER ROLE reprefuser WITH CREATEROLE;
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser'; SELECT result from run_command_on_all_nodes(
rolcreatedb | rolcreaterole $$
SELECT to_jsonb(q2.*) FROM (
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser'
) q2
$$
) ORDER BY result;
result
--------------------------------------------------------------------- ---------------------------------------------------------------------
t | t {"rolcreatedb": true, "rolcreaterole": false}
(1 row) {"rolcreatedb": true, "rolcreaterole": false}
{"rolcreatedb": true, "rolcreaterole": true}
(3 rows)
RESET citus.enable_alter_role_propagation; -- alter role from mx worker is allowed
\c - - - :worker_2_port SET citus.enable_alter_role_propagation TO ON;
-- show that altering role locally on worker doesn't propagated to other worker ALTER ROLE reprefuser WITH CREATEROLE;
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser'; -- show that altering role locally on worker is propagated to coordinator and to other workers too
rolcreatedb | rolcreaterole SELECT result from run_command_on_all_nodes(
$$
SELECT to_jsonb(q2.*) FROM (
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser'
) q2
$$
) ORDER BY result;
result
--------------------------------------------------------------------- ---------------------------------------------------------------------
t | f {"rolcreatedb": true, "rolcreaterole": true}
(1 row) {"rolcreatedb": true, "rolcreaterole": true}
{"rolcreatedb": true, "rolcreaterole": true}
(3 rows)
\c - - - :master_port \c - - - :master_port
SET search_path TO mx_add_coordinator,public; SET search_path TO mx_add_coordinator,public;
-- show that altering role locally on worker doesn't propagated to coordinator
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser';
rolcreatedb | rolcreaterole
---------------------------------------------------------------------
t | f
(1 row)
SET citus.log_local_commands TO ON; SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG; SET client_min_messages TO DEBUG;
-- if the placement policy is not round-robin, SELECTs on the reference -- if the placement policy is not round-robin, SELECTs on the reference

View File

@ -0,0 +1,274 @@
-- idempotently remove the coordinator from metadata
SELECT COUNT(citus_remove_node(nodename, nodeport)) >= 0 FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :master_port;
?column?
---------------------------------------------------------------------
t
(1 row)
-- make sure that CREATE ROLE from workers is not supported when coordinator is not added to metadata
SELECT result FROM run_command_on_workers('CREATE ROLE test_role');
result
---------------------------------------------------------------------
ERROR: coordinator is not added to the metadata
ERROR: coordinator is not added to the metadata
(2 rows)
\c - - - :master_port
CREATE SCHEMA role_command_from_any_node;
SET search_path TO role_command_from_any_node;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE OR REPLACE FUNCTION check_role_on_all_nodes(p_role_name text)
RETURNS TABLE (node_type text, result text)
AS $func$
DECLARE
v_worker_query text;
BEGIN
v_worker_query := format(
$$
SELECT to_jsonb(q1.*) FROM (
SELECT
(
SELECT COUNT(*) = 1 FROM pg_roles WHERE rolname = '%s'
) AS role_exists,
(
SELECT to_jsonb(q.*) FROM (SELECT * FROM pg_roles WHERE rolname = '%s') q
) AS role_properties,
(
SELECT COUNT(*) = 1
FROM pg_dist_object
WHERE objid = (SELECT oid FROM pg_roles WHERE rolname = '%s')
) AS pg_dist_object_record_for_role_exists,
(
SELECT COUNT(*) > 0
FROM pg_dist_object
WHERE classid = 1260 AND objid NOT IN (SELECT oid FROM pg_roles)
) AS stale_pg_dist_object_record_for_a_role_exists
) q1
$$,
p_role_name, p_role_name, p_role_name
);
RETURN QUERY
SELECT
CASE WHEN (groupid = 0 AND groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'coordinator (local)'
WHEN (groupid = 0) THEN 'coordinator (remote)'
WHEN (groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'worker node (local)'
ELSE 'worker node (remote)'
END AS node_type,
q2.result
FROM run_command_on_all_nodes(v_worker_query) q2
JOIN pg_dist_node USING (nodeid);
END;
$func$ LANGUAGE plpgsql;
\c - - - :worker_1_port
SET search_path TO role_command_from_any_node;
SET client_min_messages TO NOTICE;
SET citus.enable_create_role_propagation TO OFF;
CREATE ROLE test_role;
NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to other nodes directly to manually create all necessary users and roles.
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": true, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
DROP ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
CREATE ROLE test_role;
NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to other nodes directly to manually create all necessary users and roles.
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": true, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
SET citus.enable_create_role_propagation TO ON;
-- doesn't fail even if the role doesn't exist on other nodes
DROP ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
CREATE ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
DROP ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
CREATE ROLE test_role;
SET citus.enable_alter_role_propagation TO OFF;
ALTER ROLE test_role RENAME TO test_role_renamed;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": false, "pg_dist_object_record_for_role_exists": false, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
ALTER ROLE test_role_renamed RENAME TO test_role;
SET citus.enable_alter_role_propagation TO ON;
ALTER ROLE test_role RENAME TO test_role_renamed;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (local) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
SET citus.enable_alter_role_propagation TO OFF;
ALTER ROLE test_role_renamed CREATEDB;
SET citus.enable_alter_role_propagation TO ON;
SELECT node_type, (result::jsonb)->'role_properties'->'rolcreatedb' as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | false
worker node (local) | true
worker node (remote) | false
(3 rows)
ALTER ROLE test_role_renamed CREATEDB;
SELECT node_type, (result::jsonb)->'role_properties'->'rolcreatedb' as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (remote) | true
worker node (local) | true
worker node (remote) | true
(3 rows)
SET citus.enable_alter_role_set_propagation TO ON;
ALTER ROLE current_user IN DATABASE "regression" SET enable_hashjoin TO OFF;
SELECT result FROM run_command_on_all_nodes('SHOW enable_hashjoin') ORDER BY result;
result
---------------------------------------------------------------------
off
off
off
(3 rows)
SET citus.enable_alter_role_set_propagation TO OFF;
ALTER ROLE current_user IN DATABASE "regression" SET enable_hashjoin TO ON;
SELECT result FROM run_command_on_all_nodes('SHOW enable_hashjoin') ORDER BY result;
result
---------------------------------------------------------------------
off
off
on
(3 rows)
SET citus.enable_alter_role_set_propagation TO ON;
ALTER ROLE current_user IN DATABASE "regression" RESET enable_hashjoin;
CREATE ROLE another_user;
SET citus.enable_create_role_propagation TO OFF;
GRANT another_user TO test_role_renamed;
SELECT result FROM run_command_on_all_nodes($$
SELECT COUNT(*)=1 FROM pg_auth_members WHERE roleid = 'another_user'::regrole AND member = 'test_role_renamed'::regrole
$$) ORDER BY result;
result
---------------------------------------------------------------------
f
f
t
(3 rows)
SET citus.enable_create_role_propagation TO ON;
SET client_min_messages TO ERROR;
GRANT another_user TO test_role_renamed;
SET client_min_messages TO NOTICE;
SELECT result FROM run_command_on_all_nodes($$
SELECT COUNT(*)=1 FROM pg_auth_members WHERE roleid = 'another_user'::regrole AND member = 'test_role_renamed'::regrole
$$) ORDER BY result;
result
---------------------------------------------------------------------
t
t
t
(3 rows)
\c - - - :master_port
SET search_path TO role_command_from_any_node;
SET client_min_messages TO NOTICE;
SELECT citus_remove_node('localhost', :worker_1_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- make sure that citus_add_node() propagates the roles created via a worker
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
node_type | result
---------------------------------------------------------------------
coordinator (local) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
worker node (remote) | {"role_exists": true, "pg_dist_object_record_for_role_exists": true, "stale_pg_dist_object_record_for_a_role_exists": false}
(3 rows)
SELECT citus_remove_node('localhost', :master_port);
citus_remove_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
-- they fail because the coordinator is not added to metadata
DROP ROLE test_role_renamed;
ERROR: coordinator is not added to the metadata
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
ALTER ROLE test_role_renamed RENAME TO test_role;
ERROR: coordinator is not added to the metadata
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
ALTER ROLE test_role_renamed CREATEDB;
ERROR: coordinator is not added to the metadata
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
ALTER ROLE current_user IN DATABASE "regression" SET enable_hashjoin TO OFF;
ERROR: coordinator is not added to the metadata
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
GRANT another_user TO test_role_renamed;
ERROR: coordinator is not added to the metadata
HINT: Use SELECT citus_set_coordinator_host('<hostname>') to configure the coordinator hostname
\c - - - :master_port
DROP ROLE test_role_renamed, another_user;
SET client_min_messages TO WARNING;
DROP SCHEMA role_command_from_any_node CASCADE;

View File

@ -328,8 +328,8 @@ RESET citus.shard_replication_factor;
-- test some more error handling. We create them later there. -- test some more error handling. We create them later there.
SET citus.enable_create_role_propagation TO OFF; SET citus.enable_create_role_propagation TO OFF;
CREATE USER testrole; CREATE USER testrole;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
GRANT ALL ON SCHEMA public TO testrole; GRANT ALL ON SCHEMA public TO testrole;
ERROR: role "testrole" does not exist ERROR: role "testrole" does not exist
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
@ -731,8 +731,8 @@ ERROR: target node localhost:xxxxx is not responsive
\c - - - :worker_1_port \c - - - :worker_1_port
SET citus.enable_create_role_propagation TO OFF; SET citus.enable_create_role_propagation TO OFF;
CREATE USER testrole; CREATE USER testrole;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
GRANT ALL ON SCHEMA public TO testrole; GRANT ALL ON SCHEMA public TO testrole;
ERROR: operation is not allowed on this node ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again. HINT: Connect to the coordinator and run it again.
@ -745,8 +745,8 @@ ERROR: source node localhost:xxxxx is not responsive
\c - - - :worker_2_port \c - - - :worker_2_port
SET citus.enable_create_role_propagation TO OFF; SET citus.enable_create_role_propagation TO OFF;
CREATE USER testrole; CREATE USER testrole;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes NOTICE: not propagating CREATE ROLE/USER commands to other nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles. HINT: Connect to other nodes directly to manually create all necessary users and roles.
GRANT ALL ON SCHEMA public TO testrole; GRANT ALL ON SCHEMA public TO testrole;
ERROR: operation is not allowed on this node ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again. HINT: Connect to the coordinator and run it again.

View File

@ -27,6 +27,7 @@ test: multi_cluster_management
test: non_super_user_object_metadata test: non_super_user_object_metadata
test: propagate_foreign_servers test: propagate_foreign_servers
test: alter_role_propagation test: alter_role_propagation
test: role_command_from_any_node
test: propagate_extension_commands test: propagate_extension_commands
test: escape_extension_name test: escape_extension_name
test: ref_citus_local_fkeys test: ref_citus_local_fkeys

View File

@ -25,15 +25,10 @@ SELECT master_remove_node('localhost', :worker_2_port);
CREATE ROLE create_role_with_everything SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 105 PASSWORD 'strong_password123^' VALID UNTIL '2045-05-05 00:00:00.00+00' IN ROLE create_role, create_group ROLE create_user, create_group_2 ADMIN create_role_2, create_user_2; CREATE ROLE create_role_with_everything SUPERUSER CREATEDB CREATEROLE INHERIT LOGIN REPLICATION BYPASSRLS CONNECTION LIMIT 105 PASSWORD 'strong_password123^' VALID UNTIL '2045-05-05 00:00:00.00+00' IN ROLE create_role, create_group ROLE create_user, create_group_2 ADMIN create_role_2, create_user_2;
CREATE ROLE create_role_with_nothing NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT 3 PASSWORD 'weakpassword' VALID UNTIL '2015-05-05 00:00:00.00+00'; CREATE ROLE create_role_with_nothing NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT NOLOGIN NOREPLICATION NOBYPASSRLS CONNECTION LIMIT 3 PASSWORD 'weakpassword' VALID UNTIL '2015-05-05 00:00:00.00+00';
-- show that creating role from worker node is only allowed when create role -- show that creating role from worker node is allowed
-- propagation is off
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE ROLE role_on_worker; CREATE ROLE role_on_worker;
DROP ROLE role_on_worker;
BEGIN;
SET citus.enable_create_role_propagation TO off;
CREATE ROLE role_on_worker;
ROLLBACK;
\c - - - :master_port \c - - - :master_port

View File

@ -349,10 +349,7 @@ SELECT * FROM use_age_invalid ORDER BY 1;
ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive; ALTER DOMAIN age_invalid VALIDATE CONSTRAINT check_age_positive;
-- test changing the owner of a domain -- test changing the owner of a domain
SET client_min_messages TO error;
SELECT 1 FROM run_command_on_workers($$ CREATE ROLE domain_owner; $$);
CREATE ROLE domain_owner; CREATE ROLE domain_owner;
RESET client_min_messages;
CREATE DOMAIN alter_domain_owner AS int; CREATE DOMAIN alter_domain_owner AS int;
ALTER DOMAIN alter_domain_owner OWNER TO domain_owner; ALTER DOMAIN alter_domain_owner OWNER TO domain_owner;

View File

@ -24,8 +24,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's'); SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ROLLBACK; ROLLBACK;
-- in a distributed transaction and the application name is Citus -- in a distributed transaction and the application name is Citus, allowed.
-- but we are on the coordinator, so still not allowed
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001'; SET application_name to 'citus_internal gpid=10000000001';

View File

@ -41,23 +41,33 @@ CREATE TABLE ref(groupid int);
SELECT create_reference_table('ref'); SELECT create_reference_table('ref');
\c - - - :worker_1_port \c - - - :worker_1_port
-- alter role from mx worker isn't allowed when alter role propagation is on -- to alter role locally, disable alter role propagation first
SET citus.enable_alter_role_propagation TO ON;
ALTER ROLE reprefuser WITH CREATEROLE;
-- to alter role locally disable alter role propagation first
SET citus.enable_alter_role_propagation TO OFF; SET citus.enable_alter_role_propagation TO OFF;
ALTER ROLE reprefuser WITH CREATEROLE; ALTER ROLE reprefuser WITH CREATEROLE;
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser';
RESET citus.enable_alter_role_propagation;
\c - - - :worker_2_port SELECT result from run_command_on_all_nodes(
-- show that altering role locally on worker doesn't propagated to other worker $$
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser'; SELECT to_jsonb(q2.*) FROM (
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser'
) q2
$$
) ORDER BY result;
-- alter role from mx worker is allowed
SET citus.enable_alter_role_propagation TO ON;
ALTER ROLE reprefuser WITH CREATEROLE;
-- show that altering role locally on worker is propagated to coordinator and to other workers too
SELECT result from run_command_on_all_nodes(
$$
SELECT to_jsonb(q2.*) FROM (
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser'
) q2
$$
) ORDER BY result;
\c - - - :master_port \c - - - :master_port
SET search_path TO mx_add_coordinator,public; SET search_path TO mx_add_coordinator,public;
-- show that altering role locally on worker doesn't propagated to coordinator
SELECT rolcreatedb, rolcreaterole FROM pg_roles WHERE rolname = 'reprefuser';
SET citus.log_local_commands TO ON; SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG; SET client_min_messages TO DEBUG;

View File

@ -0,0 +1,174 @@
-- idempotently remove the coordinator from metadata
SELECT COUNT(citus_remove_node(nodename, nodeport)) >= 0 FROM pg_dist_node WHERE nodename = 'localhost' AND nodeport = :master_port;
-- make sure that CREATE ROLE from workers is not supported when coordinator is not added to metadata
SELECT result FROM run_command_on_workers('CREATE ROLE test_role');
\c - - - :master_port
CREATE SCHEMA role_command_from_any_node;
SET search_path TO role_command_from_any_node;
SET client_min_messages TO WARNING;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupid => 0);
CREATE OR REPLACE FUNCTION check_role_on_all_nodes(p_role_name text)
RETURNS TABLE (node_type text, result text)
AS $func$
DECLARE
v_worker_query text;
BEGIN
v_worker_query := format(
$$
SELECT to_jsonb(q1.*) FROM (
SELECT
(
SELECT COUNT(*) = 1 FROM pg_roles WHERE rolname = '%s'
) AS role_exists,
(
SELECT to_jsonb(q.*) FROM (SELECT * FROM pg_roles WHERE rolname = '%s') q
) AS role_properties,
(
SELECT COUNT(*) = 1
FROM pg_dist_object
WHERE objid = (SELECT oid FROM pg_roles WHERE rolname = '%s')
) AS pg_dist_object_record_for_role_exists,
(
SELECT COUNT(*) > 0
FROM pg_dist_object
WHERE classid = 1260 AND objid NOT IN (SELECT oid FROM pg_roles)
) AS stale_pg_dist_object_record_for_a_role_exists
) q1
$$,
p_role_name, p_role_name, p_role_name
);
RETURN QUERY
SELECT
CASE WHEN (groupid = 0 AND groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'coordinator (local)'
WHEN (groupid = 0) THEN 'coordinator (remote)'
WHEN (groupid = (SELECT groupid FROM pg_dist_local_group)) THEN 'worker node (local)'
ELSE 'worker node (remote)'
END AS node_type,
q2.result
FROM run_command_on_all_nodes(v_worker_query) q2
JOIN pg_dist_node USING (nodeid);
END;
$func$ LANGUAGE plpgsql;
\c - - - :worker_1_port
SET search_path TO role_command_from_any_node;
SET client_min_messages TO NOTICE;
SET citus.enable_create_role_propagation TO OFF;
CREATE ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
DROP ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
CREATE ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
SET citus.enable_create_role_propagation TO ON;
-- doesn't fail even if the role doesn't exist on other nodes
DROP ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
CREATE ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
DROP ROLE test_role;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role') ORDER BY node_type;
CREATE ROLE test_role;
SET citus.enable_alter_role_propagation TO OFF;
ALTER ROLE test_role RENAME TO test_role_renamed;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
ALTER ROLE test_role_renamed RENAME TO test_role;
SET citus.enable_alter_role_propagation TO ON;
ALTER ROLE test_role RENAME TO test_role_renamed;
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
SET citus.enable_alter_role_propagation TO OFF;
ALTER ROLE test_role_renamed CREATEDB;
SET citus.enable_alter_role_propagation TO ON;
SELECT node_type, (result::jsonb)->'role_properties'->'rolcreatedb' as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
ALTER ROLE test_role_renamed CREATEDB;
SELECT node_type, (result::jsonb)->'role_properties'->'rolcreatedb' as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
SET citus.enable_alter_role_set_propagation TO ON;
ALTER ROLE current_user IN DATABASE "regression" SET enable_hashjoin TO OFF;
SELECT result FROM run_command_on_all_nodes('SHOW enable_hashjoin') ORDER BY result;
SET citus.enable_alter_role_set_propagation TO OFF;
ALTER ROLE current_user IN DATABASE "regression" SET enable_hashjoin TO ON;
SELECT result FROM run_command_on_all_nodes('SHOW enable_hashjoin') ORDER BY result;
SET citus.enable_alter_role_set_propagation TO ON;
ALTER ROLE current_user IN DATABASE "regression" RESET enable_hashjoin;
CREATE ROLE another_user;
SET citus.enable_create_role_propagation TO OFF;
GRANT another_user TO test_role_renamed;
SELECT result FROM run_command_on_all_nodes($$
SELECT COUNT(*)=1 FROM pg_auth_members WHERE roleid = 'another_user'::regrole AND member = 'test_role_renamed'::regrole
$$) ORDER BY result;
SET citus.enable_create_role_propagation TO ON;
SET client_min_messages TO ERROR;
GRANT another_user TO test_role_renamed;
SET client_min_messages TO NOTICE;
SELECT result FROM run_command_on_all_nodes($$
SELECT COUNT(*)=1 FROM pg_auth_members WHERE roleid = 'another_user'::regrole AND member = 'test_role_renamed'::regrole
$$) ORDER BY result;
\c - - - :master_port
SET search_path TO role_command_from_any_node;
SET client_min_messages TO NOTICE;
SELECT citus_remove_node('localhost', :worker_1_port);
SELECT 1 FROM citus_add_node('localhost', :worker_1_port);
-- make sure that citus_add_node() propagates the roles created via a worker
SELECT node_type, (result::jsonb - 'role_properties') as result FROM check_role_on_all_nodes('test_role_renamed') ORDER BY node_type;
SELECT citus_remove_node('localhost', :master_port);
\c - - - :worker_1_port
-- they fail because the coordinator is not added to metadata
DROP ROLE test_role_renamed;
ALTER ROLE test_role_renamed RENAME TO test_role;
ALTER ROLE test_role_renamed CREATEDB;
ALTER ROLE current_user IN DATABASE "regression" SET enable_hashjoin TO OFF;
GRANT another_user TO test_role_renamed;
\c - - - :master_port
DROP ROLE test_role_renamed, another_user;
SET client_min_messages TO WARNING;
DROP SCHEMA role_command_from_any_node CASCADE;