allow "create / drop schema" from any node

ddl-from-any-node-phase-1
Onur Tirtir 2025-10-24 14:12:33 +03:00
parent 577aa22a6c
commit 8388b80dd3
13 changed files with 158 additions and 28 deletions

View File

@ -64,7 +64,7 @@ PostprocessCreateSchemaStmt(Node *node, const char *queryString)
return NIL;
}
EnsureCoordinator();
EnsurePropagationToCoordinator();
EnsureSequentialMode(OBJECT_SCHEMA);
@ -130,7 +130,7 @@ PostprocessCreateSchemaStmt(Node *node, const char *queryString)
commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NodeDDLTaskList(REMOTE_NODES, commands);
}
@ -157,7 +157,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString,
return NIL;
}
EnsureCoordinator();
EnsurePropagationToCoordinator();
EnsureSequentialMode(OBJECT_SCHEMA);
@ -190,7 +190,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString,
(void *) sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
return NodeDDLTaskList(REMOTE_NODES, commands);
}

View File

@ -553,7 +553,7 @@ UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName)
DeleteTenantSchemaLocally(schemaId);
if (EnableMetadataSync)
{
SendCommandToWorkersWithMetadata(TenantSchemaDeleteCommand(schemaName));
SendCommandToRemoteNodesWithMetadata(TenantSchemaDeleteCommand(schemaName));
}
DeleteColocationGroup(tenantSchemaColocationId);
@ -579,10 +579,22 @@ citus_internal_unregister_tenant_schema_globally(PG_FUNCTION_ARGS)
char *schemaNameStr = text_to_cstring(schemaName);
/*
* Skip on workers because we expect this to be called from the coordinator
* only via drop hook.
* Have this check to make sure we execute this only on the backend executing
* the distributed "DROP SCHEMA" command -not on internal backends propagating
* the DDL to remote nodes- to prevent other nodes from trying to unregister
* the same tenant schema globally, since the backend executing the distributed
* "DROP SCHEMA" command already does so globally via this function.
*
* Actually, even if didn't have this check, the other nodes would still be
* prevented from trying to unregister the same tenant schema globally. This
* is because, when dropping a distributed schema, we first delete the tenant
* schema from metadata globally and then we drop the schema itself on other
* nodes. So, when the drop hook is called on other nodes, it would not try to
* unregister the tenant schema globally since the schema would not be found
* in the tenant schema metadata. However, having this check makes it more
* explicit and guards us against future changes.
*/
if (!IsCoordinator())
if (IsCitusInternalBackend() || IsRebalancerInternalBackend())
{
PG_RETURN_VOID();
}

View File

@ -4179,7 +4179,7 @@ SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, int replicati
* We require superuser for all pg_dist_colocation operations because we have
* no reasonable way of restricting access.
*/
SendCommandToWorkersWithMetadataViaSuperUser(command);
SendCommandToRemoteNodesWithMetadataViaSuperUser(command);
}
@ -4292,7 +4292,7 @@ SyncDeleteColocationGroupToNodes(uint32 colocationId)
* We require superuser for all pg_dist_colocation operations because we have
* no reasonable way of restricting access.
*/
SendCommandToWorkersWithMetadataViaSuperUser(command);
SendCommandToRemoteNodesWithMetadataViaSuperUser(command);
}

View File

@ -3,3 +3,4 @@
#include "udfs/citus_prepare_pg_upgrade/14.0-1.sql"
#include "udfs/citus_finish_pg_upgrade/14.0-1.sql"
#include "udfs/citus_internal_get_next_colocation_id/14.0-1.sql"

View File

@ -3,3 +3,5 @@
#include "../udfs/citus_prepare_pg_upgrade/13.0-1.sql"
#include "../udfs/citus_finish_pg_upgrade/13.2-1.sql"
DROP FUNCTION IF EXISTS citus_internal.get_next_colocation_id();

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION citus_internal.get_next_colocation_id()
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_get_next_colocation_id$$;
COMMENT ON FUNCTION citus_internal.get_next_colocation_id()
IS 'retrieves the next colocation id from pg_dist_colocationid_seq';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION citus_internal.get_next_colocation_id()
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_get_next_colocation_id$$;
COMMENT ON FUNCTION citus_internal.get_next_colocation_id()
IS 'retrieves the next colocation id from pg_dist_colocationid_seq';

View File

@ -30,12 +30,14 @@
#include "distributed/commands.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_utility.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/tenant_schema_metadata.h"
@ -55,6 +57,9 @@ static int CompareShardPlacementsByNode(const void *leftElement,
const void *rightElement);
static uint32 CreateColocationGroupForRelation(Oid sourceRelationId);
static void BreakColocation(Oid sourceRelationId);
static uint32 GetNextColocationId(void);
static int64 GetNextColocationIdFromNode(WorkerNode *node);
static uint32 GetNextColocationIdInternal(void);
static uint32 SingleShardTableGetNodeId(Oid relationId);
@ -62,6 +67,7 @@ static uint32 SingleShardTableGetNodeId(Oid relationId);
PG_FUNCTION_INFO_V1(mark_tables_colocated);
PG_FUNCTION_INFO_V1(get_colocated_shard_array);
PG_FUNCTION_INFO_V1(update_distributed_table_colocation);
PG_FUNCTION_INFO_V1(citus_internal_get_next_colocation_id);
/*
@ -643,7 +649,105 @@ InsertColocationGroupLocally(uint32 colocationId, int shardCount, int replicatio
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* GetNextColocationId retrieves the next colocation id either from the local
* node if it's the coordinator or retrieves it from the coordinator otherwise.
*
* Throws an error for the latter case if the coordinator is not in metadata.
*/
static uint32
GetNextColocationId(void)
{
uint32 colocationId = INVALID_COLOCATION_ID;
if (IsCoordinator())
{
colocationId = GetNextColocationIdInternal();
}
else
{
/*
* If we're not on the coordinator, retrieve the next id from the
* coordinator node. Although all nodes have the sequence, we don't
* synchronize the sequences that are part of the Citus metadata
* across nodes, so we need to get the next value from the
* coordinator.
*
* Note that before this point, we should have already verified
* that coordinator is added into the metadata.
*/
WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();
colocationId = GetNextColocationIdFromNode(coordinator);
}
return colocationId;
}
/*
* GetNextColocationIdFromNode gets the next colocation id from given
* node by calling citus_internal.get_next_colocation_id() function.
*/
static int64
GetNextColocationIdFromNode(WorkerNode *node)
{
const char *nodeName = node->workerName;
int nodePort = node->workerPort;
uint32 connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
int querySent = SendRemoteCommand(connection,
"SELECT citus_internal.get_next_colocation_id();");
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
if (rowCount != 1 || colCount != 1)
{
ereport(ERROR, (errmsg("unexpected result from the node when getting "
"next colocation id")));
}
int64 colocationId = ParseIntField(result, 0, 0);
PQclear(result);
ForgetResults(connection);
return colocationId;
}
/*
* citus_internal_get_next_colocation_id is a wrapper around
* GetNextColocationIdInternal().
*/
Datum
citus_internal_get_next_colocation_id(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureCoordinator();
/* TODOTASK: */
/* ensure user can create schemas, it won't be super ideal to have such a check here */
/* but it's needed to avoid unprivileged users to be able to consume colocation ids. */
uint32 colocationId = GetNextColocationIdInternal();
Datum colocationIdDatum = Int64GetDatum(colocationId);
PG_RETURN_DATUM(colocationIdDatum);
}
/*
* GetNextColocationIdInternal allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
@ -652,8 +756,8 @@ InsertColocationGroupLocally(uint32 colocationId, int shardCount, int replicatio
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
uint32
GetNextColocationId()
static uint32
GetNextColocationIdInternal(void)
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName, false);

View File

@ -39,7 +39,6 @@ extern void InsertColocationGroupLocally(uint32 colocationId, int shardCount,
Oid distributionColumnCollation);
extern bool IsColocateWithNone(char *colocateWithTableName);
extern bool IsColocateWithDefault(char *colocateWithTableName);
extern uint32 GetNextColocationId(void);
extern void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId);
extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId);
extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId);

View File

@ -1667,7 +1667,8 @@ ALTER EXTENSION citus UPDATE TO '14.0-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
| function citus_internal.get_next_colocation_id() bigint
(1 row)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -1490,8 +1490,7 @@ HINT: Connect to the coordinator node and try again.
-- test creating a tenant schema from workers
SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA worker_tenant_schema;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
DROP SCHEMA worker_tenant_schema;
SET citus.enable_schema_based_sharding TO OFF;
-- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE
-- commands that we send to workers don't recursively try creating a
@ -1511,14 +1510,11 @@ SELECT pg_reload_conf();
t
(1 row)
-- Verify that citus_internal.unregister_tenant_schema_globally is a no-op
-- on workers.
-- Verify that citus_internal.unregister_tenant_schema_globally can be called
-- from workers too, but it will fail for this case as we didn't yet drop the
-- schema.
SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3');
unregister_tenant_schema_globally
---------------------------------------------------------------------
(1 row)
ERROR: schema is expected to be already dropped because this function is only expected to be called from Citus drop hook
\c - - - :master_port
SET search_path TO regular_schema;
SET citus.next_shard_id TO 1950000;

View File

@ -96,6 +96,7 @@ ORDER BY 1;
function citus_internal.delete_shard_metadata(bigint)
function citus_internal.delete_tenant_schema(oid)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.get_next_colocation_id()
function citus_internal.global_blocked_processes()
function citus_internal.is_replication_origin_tracking_active()
function citus_internal.local_blocked_processes()
@ -402,6 +403,6 @@ ORDER BY 1;
view citus_tables
view pg_dist_shard_placement
view time_partitions
(370 rows)
(371 rows)
DROP TABLE extension_basic_types;

View File

@ -1009,6 +1009,7 @@ CREATE TABLE tenant_3.tbl_1(a int, b text);
-- test creating a tenant schema from workers
SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA worker_tenant_schema;
DROP SCHEMA worker_tenant_schema;
SET citus.enable_schema_based_sharding TO OFF;
-- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE
@ -1022,8 +1023,9 @@ SELECT pg_reload_conf();
ALTER SYSTEM SET citus.enable_schema_based_sharding TO ON;
SELECT pg_reload_conf();
-- Verify that citus_internal.unregister_tenant_schema_globally is a no-op
-- on workers.
-- Verify that citus_internal.unregister_tenant_schema_globally can be called
-- from workers too, but it will fail for this case as we didn't yet drop the
-- schema.
SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3');
\c - - - :master_port