From 988ce891798c808cc54f9162e501625c0bcfa244 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 18 Nov 2025 15:21:23 +0300 Subject: [PATCH] allow "alter table set schema" from any node for Postgres tables - citus managed local tables --- .../distributed/commands/alter_table.c | 4 +- .../commands/create_distributed_table.c | 38 ++--- .../commands/schema_based_sharding.c | 8 +- src/backend/distributed/commands/table.c | 4 +- .../distributed/metadata/metadata_utility.c | 14 +- .../replicate_none_dist_table_shard.c | 143 ++++++++++-------- .../distributed/operations/shard_transfer.c | 3 +- src/include/distributed/metadata_utility.h | 2 +- src/include/distributed/shard_transfer.h | 1 + 9 files changed, 123 insertions(+), 94 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 978c868b5..54d7b9bfb 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -1524,9 +1524,9 @@ CreateCitusTableLike(TableConversionState *con) .colocateWithTableName = quote_qualified_identifier(con->schemaName, con->relationName) }; - bool allowFromWorkersIfPostgresTable = false; + bool allowFromWorkers = false; CreateSingleShardTable(con->newRelationId, colocationParam, - allowFromWorkersIfPostgresTable); + allowFromWorkers); } else { diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index c8456bc27..3b789e047 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -147,7 +147,8 @@ static void CreateCitusTable(Oid relationId, CitusTableType tableType, static void ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, DistributedTableParams * - distributedTableParams); + distributedTableParams, + bool allowFromWorkers); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty); static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, @@ -303,9 +304,9 @@ create_distributed_table(PG_FUNCTION_ARGS) .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocateWithTableName = colocateWithTableName, }; - bool allowFromWorkersIfPostgresTable = false; + bool allowFromWorkers = false; CreateSingleShardTable(relationId, colocationParam, - allowFromWorkersIfPostgresTable); + allowFromWorkers); } PG_RETURN_VOID(); @@ -1057,17 +1058,18 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, void CreateReferenceTable(Oid relationId) { + bool allowFromWorkers = false; if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { /* * Create the shard of given Citus local table on workers to convert * it into a reference table. */ - ConvertCitusLocalTableToTableType(relationId, REFERENCE_TABLE, NULL); + ConvertCitusLocalTableToTableType(relationId, REFERENCE_TABLE, NULL, + allowFromWorkers); } else { - bool allowFromWorkers = false; CreateCitusTable(relationId, REFERENCE_TABLE, NULL, allowFromWorkers); } } @@ -1079,7 +1081,7 @@ CreateReferenceTable(Oid relationId) */ void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam, - bool allowFromWorkersIfPostgresTable) + bool allowFromWorkers) { DistributedTableParams distributedTableParams = { .colocationParam = colocationParam, @@ -1096,12 +1098,13 @@ CreateSingleShardTable(Oid relationId, ColocationParam colocationParam, * table. */ ConvertCitusLocalTableToTableType(relationId, SINGLE_SHARD_DISTRIBUTED, - &distributedTableParams); + &distributedTableParams, + allowFromWorkers); } else { CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams, - allowFromWorkersIfPostgresTable); + allowFromWorkers); } } @@ -1407,7 +1410,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, */ static void ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, - DistributedTableParams *distributedTableParams) + DistributedTableParams *distributedTableParams, + bool allowFromWorkers) { if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { @@ -1426,7 +1430,6 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, "not be otherwise"))); } - bool allowFromWorkers = false; EnsureCitusTableCanBeCreated(relationId, allowFromWorkers); Relation relation = try_relation_open(relationId, ExclusiveLock); @@ -1497,11 +1500,11 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, /* * When converting to a single shard table, we want to drop the placement * on the coordinator, but only if transferring to a different node. In that - * case, shouldDropLocalPlacement is true. When converting to a reference + * case, shouldDropCoordPlacement is true. When converting to a reference * table, we always keep the placement on the coordinator, so for reference - * tables shouldDropLocalPlacement is always false. + * tables shouldDropCoordPlacement is always false. */ - bool shouldDropLocalPlacement = false; + bool shouldDropCoordPlacement = false; List *targetNodeList = NIL; if (tableType == SINGLE_SHARD_DISTRIBUTED) @@ -1513,7 +1516,7 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); targetNodeList = list_make1(targetNode); - shouldDropLocalPlacement = true; + shouldDropCoordPlacement = true; } } else if (tableType == REFERENCE_TABLE) @@ -1533,7 +1536,7 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, NoneDistTableReplicateCoordinatorPlacement(relationId, targetNodeList); } - if (shouldDropLocalPlacement) + if (shouldDropCoordPlacement) { /* * We don't yet drop the local placement before handling partitions. @@ -1583,14 +1586,15 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, .distributionColumnName = distributedTableParams->distributionColumnName, }; ConvertCitusLocalTableToTableType(partitionRelationId, tableType, - &childDistributedTableParams); + &childDistributedTableParams, + allowFromWorkers); } MemoryContextSwitchTo(oldContext); MemoryContextDelete(citusPartitionContext); } - if (shouldDropLocalPlacement) + if (shouldDropCoordPlacement) { NoneDistTableDropCoordinatorPlacementTable(relationId); } diff --git a/src/backend/distributed/commands/schema_based_sharding.c b/src/backend/distributed/commands/schema_based_sharding.c index 05b7260c4..987aafcb7 100644 --- a/src/backend/distributed/commands/schema_based_sharding.c +++ b/src/backend/distributed/commands/schema_based_sharding.c @@ -285,8 +285,8 @@ CreateTenantSchemaTable(Oid relationId) .colocationParamType = COLOCATE_WITH_COLOCATION_ID, .colocationId = colocationId, }; - bool allowFromWorkersIfPostgresTable = true; - CreateSingleShardTable(relationId, colocationParam, allowFromWorkersIfPostgresTable); + bool allowFromWorkers = true; + CreateSingleShardTable(relationId, colocationParam, allowFromWorkers); } @@ -681,9 +681,9 @@ citus_schema_distribute(PG_FUNCTION_ARGS) originalForeignKeyRecreationCommands, fkeyCommandsForRelation); DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES); - bool allowFromWorkersIfPostgresTable = false; /* TODOTASK: should we allow? */ + bool allowFromWorkers = false; /* TODOTASK: should we allow? */ CreateSingleShardTable(relationId, colocationParam, - allowFromWorkersIfPostgresTable); + allowFromWorkers); } /* We can skip foreign key validations as we are sure about them at start */ diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 5f5e152aa..c4f81a382 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -666,9 +666,9 @@ DistributePartitionUsingParent(Oid parentCitusRelationId, Oid partitionRelationI .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocateWithTableName = parentRelationName, }; - bool allowFromWorkersIfPostgresTable = false; + bool allowFromWorkers = false; CreateSingleShardTable(partitionRelationId, colocationParam, - allowFromWorkersIfPostgresTable); + allowFromWorkers); return; } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 72196dfef..860ce4e30 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1453,13 +1453,13 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement) /* - * IsRemoteShardPlacement returns true if the shard placement is on a remote - * node. + * IsNonCoordShardPlacement returns true if the shard placement is on a node + * other than coordinator. */ bool -IsRemoteShardPlacement(ShardPlacement *shardPlacement) +IsNonCoordShardPlacement(ShardPlacement *shardPlacement) { - return shardPlacement->groupId != GetLocalGroupId(); + return shardPlacement->groupId != COORDINATOR_GROUP_ID; } @@ -1860,7 +1860,7 @@ InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId, char *insertPlacementCommand = AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId); - SendCommandToWorkersWithMetadata(insertPlacementCommand); + SendCommandToRemoteNodesWithMetadata(insertPlacementCommand); return LoadShardPlacement(shardId, placementId); } @@ -2095,7 +2095,7 @@ DeleteShardPlacementRowGlobally(uint64 placementId) char *deletePlacementCommand = DeletePlacementMetadataCommand(placementId); - SendCommandToWorkersWithMetadata(deletePlacementCommand); + SendCommandToRemoteNodesWithMetadata(deletePlacementCommand); } @@ -2371,7 +2371,7 @@ UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel, replicationModel, colocationId, autoConverted); - SendCommandToWorkersWithMetadata(metadataCommand); + SendCommandToRemoteNodesWithMetadata(metadataCommand); } } diff --git a/src/backend/distributed/operations/replicate_none_dist_table_shard.c b/src/backend/distributed/operations/replicate_none_dist_table_shard.c index aa48b488a..83d660770 100644 --- a/src/backend/distributed/operations/replicate_none_dist_table_shard.c +++ b/src/backend/distributed/operations/replicate_none_dist_table_shard.c @@ -14,6 +14,7 @@ #include "miscadmin.h" #include "nodes/pg_list.h" +#include "utils/builtins.h" #include "distributed/adaptive_executor.h" #include "distributed/commands.h" @@ -22,6 +23,7 @@ #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/replicate_none_dist_table_shard.h" +#include "distributed/shard_transfer.h" #include "distributed/shard_utils.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -30,32 +32,31 @@ static void CreateForeignKeysFromReferenceTablesOnShards(Oid noneDistTableId); static Oid ForeignConstraintGetReferencingTableId(const char *queryString); static void EnsureNoneDistTableWithCoordinatorPlacement(Oid noneDistTableId); -static void SetLocalEnableManualChangesToShard(bool state); /* - * NoneDistTableReplicateCoordinatorPlacement replicates local (presumably - * coordinator) shard placement of given none-distributed table to given + * NoneDistTableReplicateCoordinatorPlacement replicates the coordinator + * shard placement of given none-distributed table to given * target nodes and inserts records for new placements into pg_dist_placement. */ void NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId, List *targetNodeList) { - EnsureCoordinator(); + EnsurePropagationToCoordinator(); EnsureNoneDistTableWithCoordinatorPlacement(noneDistTableId); /* - * We don't expect callers try to replicate the shard to remote nodes - * if some of the remote nodes have a placement for the shard already. + * We don't expect callers try to replicate the shard to worker nodes + * if some of the worker nodes have a placement for the shard already. */ int64 shardId = GetFirstShardId(noneDistTableId); - List *remoteShardPlacementList = + List *nonCoordShardPlacementList = FilterShardPlacementList(ActiveShardPlacementList(shardId), - IsRemoteShardPlacement); - if (list_length(remoteShardPlacementList) > 0) + IsNonCoordShardPlacement); + if (list_length(nonCoordShardPlacementList) > 0) { - ereport(ERROR, (errmsg("table already has a remote shard placement"))); + ereport(ERROR, (errmsg("table already has a shard placement on a worker"))); } uint64 shardLength = ShardLength(shardId); @@ -78,22 +79,63 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId, CreateShardsOnWorkers(noneDistTableId, insertedPlacementList, useExclusiveConnection); - /* fetch coordinator placement before deleting it */ - Oid localPlacementTableId = GetTableLocalShardOid(noneDistTableId, shardId); ShardPlacement *coordinatorPlacement = linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID)); /* - * CreateForeignKeysFromReferenceTablesOnShards and CopyFromLocalTableIntoDistTable - * need to ignore the local placement, hence we temporarily delete it before - * calling them. + * The work done below to replicate the shard and + * CreateForeignKeysFromReferenceTablesOnShards() itself need to ignore the + * coordinator shard placement, hence we temporarily delete it using + * DeleteShardPlacementRowGlobally() before moving forward. */ - DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId); + if (IsCoordinator()) + { + /* TODOTASK: maybe remove this codepath? "else" can possibly handle coordinator-placement too */ - /* and copy data from local placement to new placements */ - CopyFromLocalTableIntoDistTable( - localPlacementTableId, noneDistTableId - ); + /* fetch coordinator placement before deleting it */ + Oid localPlacementTableId = GetTableLocalShardOid(noneDistTableId, shardId); + + DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId); + + /* and copy data from local placement to new placements */ + CopyFromLocalTableIntoDistTable( + localPlacementTableId, noneDistTableId + ); + } + else + { + DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId); + + List *taskList = NIL; + uint64 jobId = INVALID_JOB_ID; + uint32 taskId = 0; + foreach_declared_ptr(targetNode, targetNodeList) + { + Task *task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = READ_TASK; + task->replicationModel = REPLICATION_MODEL_INVALID; + char *shardCopyCommand = CreateShardCopyCommand(LoadShardInterval(shardId), + targetNode); + SetTaskQueryStringList(task, list_make1(shardCopyCommand)); + + /* we already verified that coordinator is in the metadata */ + WorkerNode *coordinatorNode = CoordinatorNodeIfAddedAsWorkerOrError(); + + /* + * Need execute the task at the source node as we'll copy the shard + * from there, i.e., the coordinator. + */ + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, coordinatorNode); + + task->taskPlacementList = list_make1(taskPlacement); + taskList = lappend(taskList, task); + } + + ExecuteTaskList(ROW_MODIFY_READONLY, taskList); + } /* * CreateShardsOnWorkers only creates the foreign keys where given relation @@ -116,12 +158,12 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId, /* * NoneDistTableDeleteCoordinatorPlacement deletes pg_dist_placement record for - * local (presumably coordinator) shard placement of given none-distributed table. + * the coordinator shard placement of given none-distributed table. */ void NoneDistTableDeleteCoordinatorPlacement(Oid noneDistTableId) { - EnsureCoordinator(); + EnsurePropagationToCoordinator(); EnsureNoneDistTableWithCoordinatorPlacement(noneDistTableId); int64 shardId = GetFirstShardId(noneDistTableId); @@ -130,41 +172,25 @@ NoneDistTableDeleteCoordinatorPlacement(Oid noneDistTableId) ShardPlacement *coordinatorPlacement = linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID)); - /* remove the old placement from metadata of local node, i.e., coordinator */ + /* remove the old placement from metadata */ DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId); } /* - * NoneDistTableDropCoordinatorPlacementTable drops local (presumably coordinator) + * NoneDistTableDropCoordinatorPlacementTable drops the coordinator * shard placement table of given none-distributed table. */ void NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) { - EnsureCoordinator(); + EnsurePropagationToCoordinator(); if (HasDistributionKey(noneDistTableId)) { ereport(ERROR, (errmsg("table is not a none-distributed table"))); } - /* - * We undistribute Citus local tables that are not chained with any reference - * tables via foreign keys at the end of the utility hook. - * Here we temporarily set the related GUC to off to disable the logic for - * internally executed DDL's that might invoke this mechanism unnecessarily. - * - * We also temporarily disable citus.enable_manual_changes_to_shards GUC to - * allow given command to modify shard. Note that we disable it only for - * local session because changes made to shards are allowed for Citus internal - * backends anyway. - */ - int saveNestLevel = NewGUCNestLevel(); - - SetLocalEnableLocalReferenceForeignKeys(false); - SetLocalEnableManualChangesToShard(true); - StringInfo dropShardCommand = makeStringInfo(); int64 shardId = GetFirstShardId(noneDistTableId); ShardInterval *shardInterval = LoadShardInterval(shardId); @@ -176,7 +202,22 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) task->taskId = INVALID_TASK_ID; task->taskType = DDL_TASK; task->replicationModel = REPLICATION_MODEL_INVALID; - SetTaskQueryString(task, dropShardCommand->data); + + /* + * We undistribute Citus local tables that are not chained with any reference + * tables via foreign keys at the end of the utility hook. + * So we need to temporarily set the related GUC to off to disable the logic for + * internally executed DDL's that might invoke this mechanism unnecessarily. + * + * We also temporarily disable citus.enable_manual_changes_to_shards GUC to + * allow given command to modify shard. + */ + List *taskQueryStringList = list_make3( + "SET LOCAL citus.enable_local_reference_table_foreign_keys TO OFF;", + "SET LOCAL citus.enable_manual_changes_to_shards TO ON;", + dropShardCommand->data + ); + SetTaskQueryStringList(task, taskQueryStringList); ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement); SetPlacementNodeMetadata(targetPlacement, CoordinatorNodeIfAddedAsWorkerOrError()); @@ -185,8 +226,6 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) bool localExecutionSupported = true; ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); - - AtEOXact_GUC(true, saveNestLevel); } @@ -198,7 +237,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) static void CreateForeignKeysFromReferenceTablesOnShards(Oid noneDistTableId) { - EnsureCoordinator(); + EnsurePropagationToCoordinator(); if (HasDistributionKey(noneDistTableId)) { @@ -287,17 +326,3 @@ EnsureNoneDistTableWithCoordinatorPlacement(Oid noneDistTableId) ereport(ERROR, (errmsg("table does not have a coordinator placement"))); } } - - -/* - * SetLocalEnableManualChangesToShard locally enables - * citus.enable_manual_changes_to_shards GUC. - */ -static void -SetLocalEnableManualChangesToShard(bool state) -{ - set_config_option("citus.enable_manual_changes_to_shards", - state ? "on" : "off", - (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); -} diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 7d3b0e655..03b1e778d 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -166,7 +166,6 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval, int32 sourceNodePort); static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval, List *ddlCommandList); -static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); static void AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId, const char *operationName); @@ -2074,7 +2073,7 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte * worker node. This command needs to be run on the node wher you want to copy * the shard from. */ -static char * +char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode) { diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 087bbcb2e..3d5a9fd33 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -325,7 +325,7 @@ extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval); extern uint64 ShardLength(uint64 shardId); extern bool NodeGroupHasShardPlacements(int32 groupId); extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement); -extern bool IsRemoteShardPlacement(ShardPlacement *shardPlacement); +extern bool IsNonCoordShardPlacement(ShardPlacement *shardPlacement); extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode); extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)( ShardPlacement *)); diff --git a/src/include/distributed/shard_transfer.h b/src/include/distributed/shard_transfer.h index 1265fc0b4..f026947c6 100644 --- a/src/include/distributed/shard_transfer.h +++ b/src/include/distributed/shard_transfer.h @@ -76,6 +76,7 @@ extern uint64 ShardListSizeInBytes(List *colocatedShardList, extern void ErrorIfMoveUnsupportedTableType(Oid relationId); extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardIntervalList, char *snapshotName); +extern char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode); extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList); extern bool RelationCanPublishAllModifications(Oid relationId); extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalList,