diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index a8fa5c118..19fc94646 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -368,8 +368,12 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) /* since we are executing ddl commands lets disable propagation, primarily for mx */ ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), ddlCommands); + /* send commands to new workers, the current user should a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, + nodePort, + CurrentUserName(), + ddlCommands); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index bd9fb55d9..63ac316a8 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -641,11 +641,12 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - /* send commands to new workers*/ - SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CitusExtensionOwnerName(), - ddlCommands); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, + newWorkerNode->workerPort, + CurrentUserName(), + ddlCommands); } } @@ -851,6 +852,19 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + /* + * We currently require the object propagation to happen via superuser, + * see #5139. While activating a node, we sync both metadata and object + * propagation. + * + * In order to have a fully transactional semantics with add/activate + * node operations, we require superuser. Note that for creating + * non-owned objects, we already require a superuser connection. + * By ensuring the current user to be a superuser, we can guarantee + * to send all commands within the same remote transaction. + */ + EnsureSuperUser(); + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index c20e38034..50ee92d0e 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -45,8 +45,9 @@ static StringInfo CopyShardPlacementToWorkerNodeQuery( ShardPlacement *sourceShardPlacement, WorkerNode *workerNode, char transferMode); -static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, - int nodePort); +static void ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, + char *nodeName, + int nodePort); static bool AnyRelationsModifiedInTransaction(List *relationIdList); static List * ReplicatedMetadataSyncedDistributedTableList(void); @@ -336,7 +337,8 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) * table. */ static void -ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) +ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, char *nodeName, + int nodePort) { uint64 shardId = shardInterval->shardId; @@ -351,8 +353,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - if (targetPlacement != NULL) { if (targetPlacement->shardState == SHARD_STATE_ACTIVE) @@ -370,9 +370,11 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) get_rel_name(shardInterval->relationId), nodeName, nodePort))); - EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, + CurrentUserName(), + ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -587,17 +589,19 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) LockShardDistributionMetadata(shardId, ExclusiveLock); - ReplicateShardToNode(shardInterval, nodeName, nodePort); + ReplicateReferenceTableShardToNode(shardInterval, nodeName, nodePort); } /* create foreign constraints between reference tables */ foreach_ptr(shardInterval, referenceShardIntervalList) { - char *tableOwner = TableOwner(shardInterval->relationId); List *commandList = CopyShardForeignConstraintCommandList(shardInterval); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, - commandList); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, + CurrentUserName(), + commandList); } } } diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index b851d6909..5d3d9f26e 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -270,6 +270,9 @@ ERROR: permission denied for function master_update_node -- try to manipulate node metadata via privileged user SET ROLE node_metadata_user; SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test +SELECT 1 FROM master_add_node('localhost', :worker_2_port); +ERROR: operation is not allowed +HINT: Run the command with a superuser. BEGIN; SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); ?column? @@ -277,29 +280,13 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); 1 (1 row) -SELECT 1 FROM master_activate_node('localhost', :worker_2_port); -WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker -DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - SELECT 1 FROM master_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) -SELECT 1 FROM master_add_node('localhost', :worker_2_port); -WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker -DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port); +SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port); ?column? --------------------------------------------------------------------- 1 @@ -308,16 +295,14 @@ SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localh SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port; master_update_node --------------------------------------------------------------------- - -(1 row) +(0 rows) SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport; nodename | nodeport | noderole --------------------------------------------------------------------- localhost | 57637 | primary localhost | 57640 | secondary - localhost | 57641 | primary -(3 rows) +(2 rows) ABORT; \c - postgres - :master_port diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 20f0c8f16..df0a8e389 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -126,12 +126,11 @@ SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_ -- try to manipulate node metadata via privileged user SET ROLE node_metadata_user; SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test +SELECT 1 FROM master_add_node('localhost', :worker_2_port); BEGIN; SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); -SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT 1 FROM master_remove_node('localhost', :worker_2_port); -SELECT 1 FROM master_add_node('localhost', :worker_2_port); -SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port); +SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port); SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port; SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport; ABORT;