From 8388b80dd3601160f8459b8ecaba5dae12db2329 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 24 Oct 2025 14:12:33 +0300 Subject: [PATCH] allow "create / drop schema" from any node --- src/backend/distributed/commands/schema.c | 8 +- .../commands/schema_based_sharding.c | 20 +++- .../distributed/metadata/metadata_sync.c | 4 +- .../distributed/sql/citus--13.2-1--14.0-1.sql | 1 + .../sql/downgrades/citus--14.0-1--13.2-1.sql | 2 + .../14.0-1.sql | 6 + .../latest.sql | 6 + .../distributed/utils/colocation_utils.c | 110 +++++++++++++++++- src/include/distributed/colocation_utils.h | 1 - src/test/regress/expected/multi_extension.out | 5 +- .../expected/schema_based_sharding.out | 14 +-- .../expected/upgrade_list_citus_objects.out | 3 +- .../regress/sql/schema_based_sharding.sql | 6 +- 13 files changed, 158 insertions(+), 28 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/14.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/latest.sql diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index b079fe3f6..3ca166c6d 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -64,7 +64,7 @@ PostprocessCreateSchemaStmt(Node *node, const char *queryString) return NIL; } - EnsureCoordinator(); + EnsurePropagationToCoordinator(); EnsureSequentialMode(OBJECT_SCHEMA); @@ -130,7 +130,7 @@ PostprocessCreateSchemaStmt(Node *node, const char *queryString) commands = lappend(commands, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NodeDDLTaskList(REMOTE_NODES, commands); } @@ -157,7 +157,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString, return NIL; } - EnsureCoordinator(); + EnsurePropagationToCoordinator(); EnsureSequentialMode(OBJECT_SCHEMA); @@ -190,7 +190,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString, (void *) sql, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NodeDDLTaskList(REMOTE_NODES, commands); } diff --git a/src/backend/distributed/commands/schema_based_sharding.c b/src/backend/distributed/commands/schema_based_sharding.c index 6635d6817..57c119b76 100644 --- a/src/backend/distributed/commands/schema_based_sharding.c +++ b/src/backend/distributed/commands/schema_based_sharding.c @@ -553,7 +553,7 @@ UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName) DeleteTenantSchemaLocally(schemaId); if (EnableMetadataSync) { - SendCommandToWorkersWithMetadata(TenantSchemaDeleteCommand(schemaName)); + SendCommandToRemoteNodesWithMetadata(TenantSchemaDeleteCommand(schemaName)); } DeleteColocationGroup(tenantSchemaColocationId); @@ -579,10 +579,22 @@ citus_internal_unregister_tenant_schema_globally(PG_FUNCTION_ARGS) char *schemaNameStr = text_to_cstring(schemaName); /* - * Skip on workers because we expect this to be called from the coordinator - * only via drop hook. + * Have this check to make sure we execute this only on the backend executing + * the distributed "DROP SCHEMA" command -not on internal backends propagating + * the DDL to remote nodes- to prevent other nodes from trying to unregister + * the same tenant schema globally, since the backend executing the distributed + * "DROP SCHEMA" command already does so globally via this function. + * + * Actually, even if didn't have this check, the other nodes would still be + * prevented from trying to unregister the same tenant schema globally. This + * is because, when dropping a distributed schema, we first delete the tenant + * schema from metadata globally and then we drop the schema itself on other + * nodes. So, when the drop hook is called on other nodes, it would not try to + * unregister the tenant schema globally since the schema would not be found + * in the tenant schema metadata. However, having this check makes it more + * explicit and guards us against future changes. */ - if (!IsCoordinator()) + if (IsCitusInternalBackend() || IsRebalancerInternalBackend()) { PG_RETURN_VOID(); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 391444856..892a784b9 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -4179,7 +4179,7 @@ SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, int replicati * We require superuser for all pg_dist_colocation operations because we have * no reasonable way of restricting access. */ - SendCommandToWorkersWithMetadataViaSuperUser(command); + SendCommandToRemoteNodesWithMetadataViaSuperUser(command); } @@ -4292,7 +4292,7 @@ SyncDeleteColocationGroupToNodes(uint32 colocationId) * We require superuser for all pg_dist_colocation operations because we have * no reasonable way of restricting access. */ - SendCommandToWorkersWithMetadataViaSuperUser(command); + SendCommandToRemoteNodesWithMetadataViaSuperUser(command); } diff --git a/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql b/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql index 815d4e794..8ad7a45ac 100644 --- a/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql +++ b/src/backend/distributed/sql/citus--13.2-1--14.0-1.sql @@ -3,3 +3,4 @@ #include "udfs/citus_prepare_pg_upgrade/14.0-1.sql" #include "udfs/citus_finish_pg_upgrade/14.0-1.sql" +#include "udfs/citus_internal_get_next_colocation_id/14.0-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql b/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql index 199030339..4b034f07f 100644 --- a/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--14.0-1--13.2-1.sql @@ -3,3 +3,5 @@ #include "../udfs/citus_prepare_pg_upgrade/13.0-1.sql" #include "../udfs/citus_finish_pg_upgrade/13.2-1.sql" + +DROP FUNCTION IF EXISTS citus_internal.get_next_colocation_id(); diff --git a/src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/14.0-1.sql b/src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/14.0-1.sql new file mode 100644 index 000000000..e6df45559 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/14.0-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION citus_internal.get_next_colocation_id() + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_internal_get_next_colocation_id$$; +COMMENT ON FUNCTION citus_internal.get_next_colocation_id() + IS 'retrieves the next colocation id from pg_dist_colocationid_seq'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/latest.sql new file mode 100644 index 000000000..e6df45559 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_get_next_colocation_id/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION citus_internal.get_next_colocation_id() + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_internal_get_next_colocation_id$$; +COMMENT ON FUNCTION citus_internal.get_next_colocation_id() + IS 'retrieves the next colocation id from pg_dist_colocationid_seq'; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 816e3ce2a..9704c1d00 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -30,12 +30,14 @@ #include "distributed/commands.h" #include "distributed/coordinator_protocol.h" #include "distributed/listutils.h" +#include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/metadata_utility.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_colocation.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/tenant_schema_metadata.h" @@ -55,6 +57,9 @@ static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); static uint32 CreateColocationGroupForRelation(Oid sourceRelationId); static void BreakColocation(Oid sourceRelationId); +static uint32 GetNextColocationId(void); +static int64 GetNextColocationIdFromNode(WorkerNode *node); +static uint32 GetNextColocationIdInternal(void); static uint32 SingleShardTableGetNodeId(Oid relationId); @@ -62,6 +67,7 @@ static uint32 SingleShardTableGetNodeId(Oid relationId); PG_FUNCTION_INFO_V1(mark_tables_colocated); PG_FUNCTION_INFO_V1(get_colocated_shard_array); PG_FUNCTION_INFO_V1(update_distributed_table_colocation); +PG_FUNCTION_INFO_V1(citus_internal_get_next_colocation_id); /* @@ -643,7 +649,105 @@ InsertColocationGroupLocally(uint32 colocationId, int shardCount, int replicatio /* - * GetNextColocationId allocates and returns a unique colocationId for the + * GetNextColocationId retrieves the next colocation id either from the local + * node if it's the coordinator or retrieves it from the coordinator otherwise. + * + * Throws an error for the latter case if the coordinator is not in metadata. + */ +static uint32 +GetNextColocationId(void) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + if (IsCoordinator()) + { + colocationId = GetNextColocationIdInternal(); + } + else + { + /* + * If we're not on the coordinator, retrieve the next id from the + * coordinator node. Although all nodes have the sequence, we don't + * synchronize the sequences that are part of the Citus metadata + * across nodes, so we need to get the next value from the + * coordinator. + * + * Note that before this point, we should have already verified + * that coordinator is added into the metadata. + */ + WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError(); + colocationId = GetNextColocationIdFromNode(coordinator); + } + + return colocationId; +} + + +/* + * GetNextColocationIdFromNode gets the next colocation id from given + * node by calling citus_internal.get_next_colocation_id() function. + */ +static int64 +GetNextColocationIdFromNode(WorkerNode *node) +{ + const char *nodeName = node->workerName; + int nodePort = node->workerPort; + uint32 connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + + int querySent = SendRemoteCommand(connection, + "SELECT citus_internal.get_next_colocation_id();"); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + if (rowCount != 1 || colCount != 1) + { + ereport(ERROR, (errmsg("unexpected result from the node when getting " + "next colocation id"))); + } + + int64 colocationId = ParseIntField(result, 0, 0); + + PQclear(result); + ForgetResults(connection); + + return colocationId; +} + + +/* + * citus_internal_get_next_colocation_id is a wrapper around + * GetNextColocationIdInternal(). + */ +Datum +citus_internal_get_next_colocation_id(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + EnsureCoordinator(); + + /* TODOTASK: */ + /* ensure user can create schemas, it won't be super ideal to have such a check here */ + /* but it's needed to avoid unprivileged users to be able to consume colocation ids. */ + + uint32 colocationId = GetNextColocationIdInternal(); + Datum colocationIdDatum = Int64GetDatum(colocationId); + + PG_RETURN_DATUM(colocationIdDatum); +} + + +/* + * GetNextColocationIdInternal allocates and returns a unique colocationId for the * colocation group to be created. This allocation occurs both in shared memory * and in write ahead logs; writing to logs avoids the risk of having * colocationId collisions. @@ -652,8 +756,8 @@ InsertColocationGroupLocally(uint32 colocationId, int shardCount, int replicatio * with the master node. Further note that this function relies on an internal * sequence created in initdb to generate unique identifiers. */ -uint32 -GetNextColocationId() +static uint32 +GetNextColocationIdInternal(void) { text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName, false); diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 018f97570..28c4e7753 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -39,7 +39,6 @@ extern void InsertColocationGroupLocally(uint32 colocationId, int shardCount, Oid distributionColumnCollation); extern bool IsColocateWithNone(char *colocateWithTableName); extern bool IsColocateWithDefault(char *colocateWithTableName); -extern uint32 GetNextColocationId(void); extern void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId); extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId); extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 99774d9ef..13bce8f52 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1665,9 +1665,10 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 14.0-1 ALTER EXTENSION citus UPDATE TO '14.0-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- -(0 rows) + | function citus_internal.get_next_colocation_id() bigint +(1 row) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/schema_based_sharding.out b/src/test/regress/expected/schema_based_sharding.out index 711c39141..7e31cb597 100644 --- a/src/test/regress/expected/schema_based_sharding.out +++ b/src/test/regress/expected/schema_based_sharding.out @@ -1490,8 +1490,7 @@ HINT: Connect to the coordinator node and try again. -- test creating a tenant schema from workers SET citus.enable_schema_based_sharding TO ON; CREATE SCHEMA worker_tenant_schema; -ERROR: operation is not allowed on this node -HINT: Connect to the coordinator and run it again. +DROP SCHEMA worker_tenant_schema; SET citus.enable_schema_based_sharding TO OFF; -- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE -- commands that we send to workers don't recursively try creating a @@ -1511,14 +1510,11 @@ SELECT pg_reload_conf(); t (1 row) --- Verify that citus_internal.unregister_tenant_schema_globally is a no-op --- on workers. +-- Verify that citus_internal.unregister_tenant_schema_globally can be called +-- from workers too, but it will fail for this case as we didn't yet drop the +-- schema. SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3'); - unregister_tenant_schema_globally ---------------------------------------------------------------------- - -(1 row) - +ERROR: schema is expected to be already dropped because this function is only expected to be called from Citus drop hook \c - - - :master_port SET search_path TO regular_schema; SET citus.next_shard_id TO 1950000; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 76121c3ef..501bb5c31 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -96,6 +96,7 @@ ORDER BY 1; function citus_internal.delete_shard_metadata(bigint) function citus_internal.delete_tenant_schema(oid) function citus_internal.find_groupid_for_node(text,integer) + function citus_internal.get_next_colocation_id() function citus_internal.global_blocked_processes() function citus_internal.is_replication_origin_tracking_active() function citus_internal.local_blocked_processes() @@ -402,6 +403,6 @@ ORDER BY 1; view citus_tables view pg_dist_shard_placement view time_partitions -(370 rows) +(371 rows) DROP TABLE extension_basic_types; diff --git a/src/test/regress/sql/schema_based_sharding.sql b/src/test/regress/sql/schema_based_sharding.sql index f0b2276df..ad437e7d8 100644 --- a/src/test/regress/sql/schema_based_sharding.sql +++ b/src/test/regress/sql/schema_based_sharding.sql @@ -1009,6 +1009,7 @@ CREATE TABLE tenant_3.tbl_1(a int, b text); -- test creating a tenant schema from workers SET citus.enable_schema_based_sharding TO ON; CREATE SCHEMA worker_tenant_schema; +DROP SCHEMA worker_tenant_schema; SET citus.enable_schema_based_sharding TO OFF; -- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE @@ -1022,8 +1023,9 @@ SELECT pg_reload_conf(); ALTER SYSTEM SET citus.enable_schema_based_sharding TO ON; SELECT pg_reload_conf(); --- Verify that citus_internal.unregister_tenant_schema_globally is a no-op --- on workers. +-- Verify that citus_internal.unregister_tenant_schema_globally can be called +-- from workers too, but it will fail for this case as we didn't yet drop the +-- schema. SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3'); \c - - - :master_port