From f650f9ced4623d3176a260713ba003a743ddb9d2 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 18 Nov 2025 16:04:23 +0300 Subject: [PATCH] fixup! allow "alter table set schema" from any node for Postgres tables - citus managed local tables merge codepaths --- .../replicate_none_dist_table_shard.c | 103 +++++++++--------- 1 file changed, 51 insertions(+), 52 deletions(-) diff --git a/src/backend/distributed/operations/replicate_none_dist_table_shard.c b/src/backend/distributed/operations/replicate_none_dist_table_shard.c index 83d660770..2cdf3ebbd 100644 --- a/src/backend/distributed/operations/replicate_none_dist_table_shard.c +++ b/src/backend/distributed/operations/replicate_none_dist_table_shard.c @@ -29,6 +29,7 @@ #include "distributed/worker_protocol.h" +static void ReplicateCoordinatorPlacementData(uint64 shardId, List *targetNodeList); static void CreateForeignKeysFromReferenceTablesOnShards(Oid noneDistTableId); static Oid ForeignConstraintGetReferencingTableId(const char *queryString); static void EnsureNoneDistTableWithCoordinatorPlacement(Oid noneDistTableId); @@ -82,60 +83,14 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId, ShardPlacement *coordinatorPlacement = linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID)); + /* copy data from coordinator placement to new placements */ + ReplicateCoordinatorPlacementData(shardId, targetNodeList); + /* - * The work done below to replicate the shard and - * CreateForeignKeysFromReferenceTablesOnShards() itself need to ignore the - * coordinator shard placement, hence we temporarily delete it using - * DeleteShardPlacementRowGlobally() before moving forward. + * CreateForeignKeysFromReferenceTablesOnShards + * needs to ignore the local placement, hence we temporarily delete it. */ - if (IsCoordinator()) - { - /* TODOTASK: maybe remove this codepath? "else" can possibly handle coordinator-placement too */ - - /* fetch coordinator placement before deleting it */ - Oid localPlacementTableId = GetTableLocalShardOid(noneDistTableId, shardId); - - DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId); - - /* and copy data from local placement to new placements */ - CopyFromLocalTableIntoDistTable( - localPlacementTableId, noneDistTableId - ); - } - else - { - DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId); - - List *taskList = NIL; - uint64 jobId = INVALID_JOB_ID; - uint32 taskId = 0; - foreach_declared_ptr(targetNode, targetNodeList) - { - Task *task = CitusMakeNode(Task); - task->jobId = jobId; - task->taskId = taskId++; - task->taskType = READ_TASK; - task->replicationModel = REPLICATION_MODEL_INVALID; - char *shardCopyCommand = CreateShardCopyCommand(LoadShardInterval(shardId), - targetNode); - SetTaskQueryStringList(task, list_make1(shardCopyCommand)); - - /* we already verified that coordinator is in the metadata */ - WorkerNode *coordinatorNode = CoordinatorNodeIfAddedAsWorkerOrError(); - - /* - * Need execute the task at the source node as we'll copy the shard - * from there, i.e., the coordinator. - */ - ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); - SetPlacementNodeMetadata(taskPlacement, coordinatorNode); - - task->taskPlacementList = list_make1(taskPlacement); - taskList = lappend(taskList, task); - } - - ExecuteTaskList(ROW_MODIFY_READONLY, taskList); - } + DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId); /* * CreateShardsOnWorkers only creates the foreign keys where given relation @@ -229,6 +184,50 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId) } +/* + * ReplicateCoordinatorPlacementData copies data from the coordinator + * shard placement to the target nodes. + * + * Assumes that the shard placements already exist on the coordinator and + * target nodes. + */ +static void +ReplicateCoordinatorPlacementData(uint64 shardId, List *targetNodeList) +{ + List *taskList = NIL; + + uint64 jobId = INVALID_JOB_ID; + uint32 taskId = 0; + WorkerNode *targetNode = NULL; + foreach_declared_ptr(targetNode, targetNodeList) + { + Task *task = CitusMakeNode(Task); + task->jobId = jobId; + task->taskId = taskId++; + task->taskType = READ_TASK; + task->replicationModel = REPLICATION_MODEL_INVALID; + char *shardCopyCommand = CreateShardCopyCommand(LoadShardInterval(shardId), + targetNode); + SetTaskQueryStringList(task, list_make1(shardCopyCommand)); + + /* we already verified that coordinator is in the metadata */ + WorkerNode *coordinatorNode = CoordinatorNodeIfAddedAsWorkerOrError(); + + /* + * Need execute the task at the source node as we'll copy the shard + * from there, i.e., the coordinator. + */ + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, coordinatorNode); + + task->taskPlacementList = list_make1(taskPlacement); + taskList = lappend(taskList, task); + } + + ExecuteTaskList(ROW_MODIFY_READONLY, taskList); +} + + /* * CreateForeignKeysFromReferenceTablesOnShards creates foreign keys on shards * where given none-distributed table is the referenced table and the referencing