allow "alter schema rename to / owner to" from any node

ddl-from-any-node-phase-1
Onur Tirtir 2025-10-24 14:12:07 +03:00
parent 25507a5804
commit 578b996401
5 changed files with 74 additions and 16 deletions

View File

@ -31,6 +31,13 @@
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
static List * PreprocessAlterDistributedObjectStmtInternal(Node *stmt,
const char *queryString,
ProcessUtilityContext
processUtilityContext,
bool allowFromWorkers);
/* /*
* PostprocessCreateDistributedObjectFromCatalogStmt is a common function that can be used * PostprocessCreateDistributedObjectFromCatalogStmt is a common function that can be used
* for most objects during their creation phase. After the creation has happened locally * for most objects during their creation phase. After the creation has happened locally
@ -104,9 +111,39 @@ PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryS
/* /*
* PreprocessAlterDistributedObjectStmtFromCoordinator handles any updates to distributed * PreprocessAlterDistributedObjectStmtFromCoordinator is a wrapper around
* objects by creating the fully qualified sql to apply to all workers after checking all * PreprocessAlterDistributedObjectStmtInternal to be used when altering distributed
* predconditions that apply to propagating changes. * objects that we allow altering only from the coordinator.
*/
List *
PreprocessAlterDistributedObjectStmtFromCoordinator(Node *stmt, const char *queryString,
ProcessUtilityContext
processUtilityContext)
{
return PreprocessAlterDistributedObjectStmtInternal(stmt, queryString,
processUtilityContext, false);
}
/*
* PreprocessAlterDistributedObjectStmtFromCoordinator is a wrapper around
* PreprocessAlterDistributedObjectStmtInternal to be used when altering distributed
* objects that we allow altering only from the coordinator.
*/
List *
PreprocessAlterDistributedObjectStmtAnyNodes(Node *stmt, const char *queryString,
ProcessUtilityContext
processUtilityContext)
{
return PreprocessAlterDistributedObjectStmtInternal(stmt, queryString,
processUtilityContext, true);
}
/*
* PreprocessAlterDistributedObjectStmtInternal handles any updates to distributed
* objects by creating the fully qualified sql to apply to all other nodes after checking
* all predconditions that apply to propagating changes.
* *
* Preconditions are (in order): * Preconditions are (in order):
* - not in a CREATE/ALTER EXTENSION code block * - not in a CREATE/ALTER EXTENSION code block
@ -114,18 +151,18 @@ PostprocessCreateDistributedObjectFromCatalogStmt(Node *stmt, const char *queryS
* - object being altered is distributed * - object being altered is distributed
* - any object specific feature flag is turned on when a feature flag is available * - any object specific feature flag is turned on when a feature flag is available
* *
* Once we conclude to propagate the changes to the workers we make sure that the command * Once we conclude to propagate the changes to other nodes we make sure that the command
* has been executed on the coordinator and force any ongoing transaction to run in * has been executed on the local node and force any ongoing transaction to run in
* sequential mode. If any of these steps fail we raise an error to inform the user. * sequential mode. If any of these steps fail we raise an error to inform the user.
* *
* Lastly we recreate a fully qualified version of the original sql and prepare the tasks * Lastly we recreate a fully qualified version of the original sql and prepare the tasks
* to send these sql commands to the workers. These tasks include instructions to prevent * to send these sql commands to other nodes. These tasks include instructions to prevent
* recursion of propagation with Citus' MX functionality. * recursion of propagation with Citus' MX functionality.
*/ */
List * static List *
PreprocessAlterDistributedObjectStmtFromCoordinator(Node *stmt, const char *queryString, PreprocessAlterDistributedObjectStmtInternal(Node *stmt, const char *queryString,
ProcessUtilityContext ProcessUtilityContext processUtilityContext,
processUtilityContext) bool allowFromWorkers)
{ {
const DistributeObjectOps *ops = GetDistributeObjectOps(stmt); const DistributeObjectOps *ops = GetDistributeObjectOps(stmt);
Assert(ops != NULL); Assert(ops != NULL);
@ -146,7 +183,15 @@ PreprocessAlterDistributedObjectStmtFromCoordinator(Node *stmt, const char *quer
return NIL; return NIL;
} }
EnsureCoordinator(); if (allowFromWorkers)
{
EnsurePropagationToCoordinator();
}
else
{
EnsureCoordinator();
}
EnsureSequentialMode(ops->objectType); EnsureSequentialMode(ops->objectType);
QualifyTreeNode(stmt); QualifyTreeNode(stmt);
@ -156,7 +201,7 @@ PreprocessAlterDistributedObjectStmtFromCoordinator(Node *stmt, const char *quer
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }

View File

@ -1190,7 +1190,7 @@ static DistributeObjectOps Routine_Rename = {
static DistributeObjectOps Schema_AlterOwner = { static DistributeObjectOps Schema_AlterOwner = {
.deparse = DeparseAlterSchemaOwnerStmt, .deparse = DeparseAlterSchemaOwnerStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmtFromCoordinator, .preprocess = PreprocessAlterDistributedObjectStmtAnyNodes,
.operationType = DIST_OPS_ALTER, .operationType = DIST_OPS_ALTER,
.postprocess = NULL, .postprocess = NULL,
.address = AlterSchemaOwnerStmtObjectAddress, .address = AlterSchemaOwnerStmtObjectAddress,
@ -1217,7 +1217,7 @@ static DistributeObjectOps Schema_Grant = {
static DistributeObjectOps Schema_Rename = { static DistributeObjectOps Schema_Rename = {
.deparse = DeparseAlterSchemaRenameStmt, .deparse = DeparseAlterSchemaRenameStmt,
.qualify = NULL, .qualify = NULL,
.preprocess = PreprocessAlterDistributedObjectStmtFromCoordinator, .preprocess = PreprocessAlterDistributedObjectStmtAnyNodes,
.postprocess = NULL, .postprocess = NULL,
.objectType = OBJECT_SCHEMA, .objectType = OBJECT_SCHEMA,
.operationType = DIST_OPS_ALTER, .operationType = DIST_OPS_ALTER,

View File

@ -191,6 +191,10 @@ extern List * PreprocessAlterDistributedObjectStmtFromCoordinator(Node *stmt,
const char *queryString, const char *queryString,
ProcessUtilityContext ProcessUtilityContext
processUtilityContext); processUtilityContext);
extern List * PreprocessAlterDistributedObjectStmtAnyNodes(Node *stmt,
const char *queryString,
ProcessUtilityContext
processUtilityContext);
extern List * PostprocessAlterDistributedObjectStmtFromCoordinator(Node *stmt, extern List * PostprocessAlterDistributedObjectStmtFromCoordinator(Node *stmt,
const char *queryString); const char *queryString);
extern List * PreprocessDropDistributedObjectStmt(Node *node, const char *queryString, extern List * PreprocessDropDistributedObjectStmt(Node *node, const char *queryString,

View File

@ -487,9 +487,16 @@ SELECT table_schema AS "Shards' Schema"
(1 row) (1 row)
-- Show that altering distributed schema is not allowed on worker nodes -- Show that altering distributed schema is not allowed on worker nodes
-- when the coordinator is not in the metadata.
SELECT COUNT(*)=0 FROM pg_dist_node WHERE groupid = 0; -- verify that the coordinator is not in the metadata
?column?
---------------------------------------------------------------------
t
(1 row)
ALTER SCHEMA mx_old_schema RENAME TO temp_mx_old_schema; ALTER SCHEMA mx_old_schema RENAME TO temp_mx_old_schema;
ERROR: operation is not allowed on this node ERROR: coordinator is not added to the metadata
HINT: Connect to the coordinator and run it again. HINT: Use SELECT citus_set_coordinator_host('<hostname>') on coordinator to configure the coordinator hostname
\c - - - :master_port \c - - - :master_port
ALTER TABLE mx_old_schema.table_set_schema SET SCHEMA mx_new_schema; ALTER TABLE mx_old_schema.table_set_schema SET SCHEMA mx_new_schema;
SELECT objid::oid::regnamespace::text as "Distributed Schemas" SELECT objid::oid::regnamespace::text as "Distributed Schemas"

View File

@ -321,6 +321,8 @@ SELECT table_schema AS "Shards' Schema"
GROUP BY table_schema; GROUP BY table_schema;
-- Show that altering distributed schema is not allowed on worker nodes -- Show that altering distributed schema is not allowed on worker nodes
-- when the coordinator is not in the metadata.
SELECT COUNT(*)=0 FROM pg_dist_node WHERE groupid = 0; -- verify that the coordinator is not in the metadata
ALTER SCHEMA mx_old_schema RENAME TO temp_mx_old_schema; ALTER SCHEMA mx_old_schema RENAME TO temp_mx_old_schema;
\c - - - :master_port \c - - - :master_port