allow "alter table set schema" from any node for Postgres tables - citus managed local tables

ddl-from-any-node-phase-1
Onur Tirtir 2025-11-18 15:21:23 +03:00
parent c14b135780
commit 988ce89179
9 changed files with 123 additions and 94 deletions

View File

@ -1524,9 +1524,9 @@ CreateCitusTableLike(TableConversionState *con)
.colocateWithTableName = quote_qualified_identifier(con->schemaName,
con->relationName)
};
bool allowFromWorkersIfPostgresTable = false;
bool allowFromWorkers = false;
CreateSingleShardTable(con->newRelationId, colocationParam,
allowFromWorkersIfPostgresTable);
allowFromWorkers);
}
else
{

View File

@ -147,7 +147,8 @@ static void CreateCitusTable(Oid relationId, CitusTableType tableType,
static void ConvertCitusLocalTableToTableType(Oid relationId,
CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
distributedTableParams,
bool allowFromWorkers);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
@ -303,9 +304,9 @@ create_distributed_table(PG_FUNCTION_ARGS)
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
.colocateWithTableName = colocateWithTableName,
};
bool allowFromWorkersIfPostgresTable = false;
bool allowFromWorkers = false;
CreateSingleShardTable(relationId, colocationParam,
allowFromWorkersIfPostgresTable);
allowFromWorkers);
}
PG_RETURN_VOID();
@ -1057,17 +1058,18 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
void
CreateReferenceTable(Oid relationId)
{
bool allowFromWorkers = false;
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
/*
* Create the shard of given Citus local table on workers to convert
* it into a reference table.
*/
ConvertCitusLocalTableToTableType(relationId, REFERENCE_TABLE, NULL);
ConvertCitusLocalTableToTableType(relationId, REFERENCE_TABLE, NULL,
allowFromWorkers);
}
else
{
bool allowFromWorkers = false;
CreateCitusTable(relationId, REFERENCE_TABLE, NULL, allowFromWorkers);
}
}
@ -1079,7 +1081,7 @@ CreateReferenceTable(Oid relationId)
*/
void
CreateSingleShardTable(Oid relationId, ColocationParam colocationParam,
bool allowFromWorkersIfPostgresTable)
bool allowFromWorkers)
{
DistributedTableParams distributedTableParams = {
.colocationParam = colocationParam,
@ -1096,12 +1098,13 @@ CreateSingleShardTable(Oid relationId, ColocationParam colocationParam,
* table.
*/
ConvertCitusLocalTableToTableType(relationId, SINGLE_SHARD_DISTRIBUTED,
&distributedTableParams);
&distributedTableParams,
allowFromWorkers);
}
else
{
CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams,
allowFromWorkersIfPostgresTable);
allowFromWorkers);
}
}
@ -1407,7 +1410,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
*/
static void
ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
DistributedTableParams *distributedTableParams,
bool allowFromWorkers)
{
if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
@ -1426,7 +1430,6 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
"not be otherwise")));
}
bool allowFromWorkers = false;
EnsureCitusTableCanBeCreated(relationId, allowFromWorkers);
Relation relation = try_relation_open(relationId, ExclusiveLock);
@ -1497,11 +1500,11 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
/*
* When converting to a single shard table, we want to drop the placement
* on the coordinator, but only if transferring to a different node. In that
* case, shouldDropLocalPlacement is true. When converting to a reference
* case, shouldDropCoordPlacement is true. When converting to a reference
* table, we always keep the placement on the coordinator, so for reference
* tables shouldDropLocalPlacement is always false.
* tables shouldDropCoordPlacement is always false.
*/
bool shouldDropLocalPlacement = false;
bool shouldDropCoordPlacement = false;
List *targetNodeList = NIL;
if (tableType == SINGLE_SHARD_DISTRIBUTED)
@ -1513,7 +1516,7 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
targetNodeList = list_make1(targetNode);
shouldDropLocalPlacement = true;
shouldDropCoordPlacement = true;
}
}
else if (tableType == REFERENCE_TABLE)
@ -1533,7 +1536,7 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
NoneDistTableReplicateCoordinatorPlacement(relationId, targetNodeList);
}
if (shouldDropLocalPlacement)
if (shouldDropCoordPlacement)
{
/*
* We don't yet drop the local placement before handling partitions.
@ -1583,14 +1586,15 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
.distributionColumnName = distributedTableParams->distributionColumnName,
};
ConvertCitusLocalTableToTableType(partitionRelationId, tableType,
&childDistributedTableParams);
&childDistributedTableParams,
allowFromWorkers);
}
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(citusPartitionContext);
}
if (shouldDropLocalPlacement)
if (shouldDropCoordPlacement)
{
NoneDistTableDropCoordinatorPlacementTable(relationId);
}

View File

@ -285,8 +285,8 @@ CreateTenantSchemaTable(Oid relationId)
.colocationParamType = COLOCATE_WITH_COLOCATION_ID,
.colocationId = colocationId,
};
bool allowFromWorkersIfPostgresTable = true;
CreateSingleShardTable(relationId, colocationParam, allowFromWorkersIfPostgresTable);
bool allowFromWorkers = true;
CreateSingleShardTable(relationId, colocationParam, allowFromWorkers);
}
@ -681,9 +681,9 @@ citus_schema_distribute(PG_FUNCTION_ARGS)
originalForeignKeyRecreationCommands, fkeyCommandsForRelation);
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES);
bool allowFromWorkersIfPostgresTable = false; /* TODOTASK: should we allow? */
bool allowFromWorkers = false; /* TODOTASK: should we allow? */
CreateSingleShardTable(relationId, colocationParam,
allowFromWorkersIfPostgresTable);
allowFromWorkers);
}
/* We can skip foreign key validations as we are sure about them at start */

View File

@ -666,9 +666,9 @@ DistributePartitionUsingParent(Oid parentCitusRelationId, Oid partitionRelationI
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
.colocateWithTableName = parentRelationName,
};
bool allowFromWorkersIfPostgresTable = false;
bool allowFromWorkers = false;
CreateSingleShardTable(partitionRelationId, colocationParam,
allowFromWorkersIfPostgresTable);
allowFromWorkers);
return;
}

View File

@ -1453,13 +1453,13 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement)
/*
* IsRemoteShardPlacement returns true if the shard placement is on a remote
* node.
* IsNonCoordShardPlacement returns true if the shard placement is on a node
* other than coordinator.
*/
bool
IsRemoteShardPlacement(ShardPlacement *shardPlacement)
IsNonCoordShardPlacement(ShardPlacement *shardPlacement)
{
return shardPlacement->groupId != GetLocalGroupId();
return shardPlacement->groupId != COORDINATOR_GROUP_ID;
}
@ -1860,7 +1860,7 @@ InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
char *insertPlacementCommand =
AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId);
SendCommandToWorkersWithMetadata(insertPlacementCommand);
SendCommandToRemoteNodesWithMetadata(insertPlacementCommand);
return LoadShardPlacement(shardId, placementId);
}
@ -2095,7 +2095,7 @@ DeleteShardPlacementRowGlobally(uint64 placementId)
char *deletePlacementCommand =
DeletePlacementMetadataCommand(placementId);
SendCommandToWorkersWithMetadata(deletePlacementCommand);
SendCommandToRemoteNodesWithMetadata(deletePlacementCommand);
}
@ -2371,7 +2371,7 @@ UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel,
replicationModel,
colocationId,
autoConverted);
SendCommandToWorkersWithMetadata(metadataCommand);
SendCommandToRemoteNodesWithMetadata(metadataCommand);
}
}

View File

@ -14,6 +14,7 @@
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "utils/builtins.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands.h"
@ -22,6 +23,7 @@
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
#include "distributed/replicate_none_dist_table_shard.h"
#include "distributed/shard_transfer.h"
#include "distributed/shard_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
@ -30,32 +32,31 @@
static void CreateForeignKeysFromReferenceTablesOnShards(Oid noneDistTableId);
static Oid ForeignConstraintGetReferencingTableId(const char *queryString);
static void EnsureNoneDistTableWithCoordinatorPlacement(Oid noneDistTableId);
static void SetLocalEnableManualChangesToShard(bool state);
/*
* NoneDistTableReplicateCoordinatorPlacement replicates local (presumably
* coordinator) shard placement of given none-distributed table to given
* NoneDistTableReplicateCoordinatorPlacement replicates the coordinator
* shard placement of given none-distributed table to given
* target nodes and inserts records for new placements into pg_dist_placement.
*/
void
NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId,
List *targetNodeList)
{
EnsureCoordinator();
EnsurePropagationToCoordinator();
EnsureNoneDistTableWithCoordinatorPlacement(noneDistTableId);
/*
* We don't expect callers try to replicate the shard to remote nodes
* if some of the remote nodes have a placement for the shard already.
* We don't expect callers try to replicate the shard to worker nodes
* if some of the worker nodes have a placement for the shard already.
*/
int64 shardId = GetFirstShardId(noneDistTableId);
List *remoteShardPlacementList =
List *nonCoordShardPlacementList =
FilterShardPlacementList(ActiveShardPlacementList(shardId),
IsRemoteShardPlacement);
if (list_length(remoteShardPlacementList) > 0)
IsNonCoordShardPlacement);
if (list_length(nonCoordShardPlacementList) > 0)
{
ereport(ERROR, (errmsg("table already has a remote shard placement")));
ereport(ERROR, (errmsg("table already has a shard placement on a worker")));
}
uint64 shardLength = ShardLength(shardId);
@ -78,22 +79,63 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId,
CreateShardsOnWorkers(noneDistTableId, insertedPlacementList,
useExclusiveConnection);
/* fetch coordinator placement before deleting it */
Oid localPlacementTableId = GetTableLocalShardOid(noneDistTableId, shardId);
ShardPlacement *coordinatorPlacement =
linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID));
/*
* CreateForeignKeysFromReferenceTablesOnShards and CopyFromLocalTableIntoDistTable
* need to ignore the local placement, hence we temporarily delete it before
* calling them.
* The work done below to replicate the shard and
* CreateForeignKeysFromReferenceTablesOnShards() itself need to ignore the
* coordinator shard placement, hence we temporarily delete it using
* DeleteShardPlacementRowGlobally() before moving forward.
*/
DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId);
if (IsCoordinator())
{
/* TODOTASK: maybe remove this codepath? "else" can possibly handle coordinator-placement too */
/* and copy data from local placement to new placements */
CopyFromLocalTableIntoDistTable(
localPlacementTableId, noneDistTableId
);
/* fetch coordinator placement before deleting it */
Oid localPlacementTableId = GetTableLocalShardOid(noneDistTableId, shardId);
DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId);
/* and copy data from local placement to new placements */
CopyFromLocalTableIntoDistTable(
localPlacementTableId, noneDistTableId
);
}
else
{
DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId);
List *taskList = NIL;
uint64 jobId = INVALID_JOB_ID;
uint32 taskId = 0;
foreach_declared_ptr(targetNode, targetNodeList)
{
Task *task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = READ_TASK;
task->replicationModel = REPLICATION_MODEL_INVALID;
char *shardCopyCommand = CreateShardCopyCommand(LoadShardInterval(shardId),
targetNode);
SetTaskQueryStringList(task, list_make1(shardCopyCommand));
/* we already verified that coordinator is in the metadata */
WorkerNode *coordinatorNode = CoordinatorNodeIfAddedAsWorkerOrError();
/*
* Need execute the task at the source node as we'll copy the shard
* from there, i.e., the coordinator.
*/
ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(taskPlacement, coordinatorNode);
task->taskPlacementList = list_make1(taskPlacement);
taskList = lappend(taskList, task);
}
ExecuteTaskList(ROW_MODIFY_READONLY, taskList);
}
/*
* CreateShardsOnWorkers only creates the foreign keys where given relation
@ -116,12 +158,12 @@ NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId,
/*
* NoneDistTableDeleteCoordinatorPlacement deletes pg_dist_placement record for
* local (presumably coordinator) shard placement of given none-distributed table.
* the coordinator shard placement of given none-distributed table.
*/
void
NoneDistTableDeleteCoordinatorPlacement(Oid noneDistTableId)
{
EnsureCoordinator();
EnsurePropagationToCoordinator();
EnsureNoneDistTableWithCoordinatorPlacement(noneDistTableId);
int64 shardId = GetFirstShardId(noneDistTableId);
@ -130,41 +172,25 @@ NoneDistTableDeleteCoordinatorPlacement(Oid noneDistTableId)
ShardPlacement *coordinatorPlacement =
linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID));
/* remove the old placement from metadata of local node, i.e., coordinator */
/* remove the old placement from metadata */
DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId);
}
/*
* NoneDistTableDropCoordinatorPlacementTable drops local (presumably coordinator)
* NoneDistTableDropCoordinatorPlacementTable drops the coordinator
* shard placement table of given none-distributed table.
*/
void
NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId)
{
EnsureCoordinator();
EnsurePropagationToCoordinator();
if (HasDistributionKey(noneDistTableId))
{
ereport(ERROR, (errmsg("table is not a none-distributed table")));
}
/*
* We undistribute Citus local tables that are not chained with any reference
* tables via foreign keys at the end of the utility hook.
* Here we temporarily set the related GUC to off to disable the logic for
* internally executed DDL's that might invoke this mechanism unnecessarily.
*
* We also temporarily disable citus.enable_manual_changes_to_shards GUC to
* allow given command to modify shard. Note that we disable it only for
* local session because changes made to shards are allowed for Citus internal
* backends anyway.
*/
int saveNestLevel = NewGUCNestLevel();
SetLocalEnableLocalReferenceForeignKeys(false);
SetLocalEnableManualChangesToShard(true);
StringInfo dropShardCommand = makeStringInfo();
int64 shardId = GetFirstShardId(noneDistTableId);
ShardInterval *shardInterval = LoadShardInterval(shardId);
@ -176,7 +202,22 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId)
task->taskId = INVALID_TASK_ID;
task->taskType = DDL_TASK;
task->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryString(task, dropShardCommand->data);
/*
* We undistribute Citus local tables that are not chained with any reference
* tables via foreign keys at the end of the utility hook.
* So we need to temporarily set the related GUC to off to disable the logic for
* internally executed DDL's that might invoke this mechanism unnecessarily.
*
* We also temporarily disable citus.enable_manual_changes_to_shards GUC to
* allow given command to modify shard.
*/
List *taskQueryStringList = list_make3(
"SET LOCAL citus.enable_local_reference_table_foreign_keys TO OFF;",
"SET LOCAL citus.enable_manual_changes_to_shards TO ON;",
dropShardCommand->data
);
SetTaskQueryStringList(task, taskQueryStringList);
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(targetPlacement, CoordinatorNodeIfAddedAsWorkerOrError());
@ -185,8 +226,6 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId)
bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
AtEOXact_GUC(true, saveNestLevel);
}
@ -198,7 +237,7 @@ NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId)
static void
CreateForeignKeysFromReferenceTablesOnShards(Oid noneDistTableId)
{
EnsureCoordinator();
EnsurePropagationToCoordinator();
if (HasDistributionKey(noneDistTableId))
{
@ -287,17 +326,3 @@ EnsureNoneDistTableWithCoordinatorPlacement(Oid noneDistTableId)
ereport(ERROR, (errmsg("table does not have a coordinator placement")));
}
}
/*
* SetLocalEnableManualChangesToShard locally enables
* citus.enable_manual_changes_to_shards GUC.
*/
static void
SetLocalEnableManualChangesToShard(bool state)
{
set_config_option("citus.enable_manual_changes_to_shards",
state ? "on" : "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
}

View File

@ -166,7 +166,6 @@ static List * PostLoadShardCreationCommandList(ShardInterval *shardInterval,
int32 sourceNodePort);
static ShardCommandList * CreateShardCommandList(ShardInterval *shardInterval,
List *ddlCommandList);
static char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode);
static void AcquireShardPlacementLock(uint64_t shardId, int lockMode, Oid relationId,
const char *operationName);
@ -2074,7 +2073,7 @@ CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode, List *shardInte
* worker node. This command needs to be run on the node wher you want to copy
* the shard from.
*/
static char *
char *
CreateShardCopyCommand(ShardInterval *shard,
WorkerNode *targetNode)
{

View File

@ -325,7 +325,7 @@ extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId);
extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement);
extern bool IsRemoteShardPlacement(ShardPlacement *shardPlacement);
extern bool IsNonCoordShardPlacement(ShardPlacement *shardPlacement);
extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)(
ShardPlacement *));

View File

@ -76,6 +76,7 @@ extern uint64 ShardListSizeInBytes(List *colocatedShardList,
extern void ErrorIfMoveUnsupportedTableType(Oid relationId);
extern void CopyShardsToNode(WorkerNode *sourceNode, WorkerNode *targetNode,
List *shardIntervalList, char *snapshotName);
extern char * CreateShardCopyCommand(ShardInterval *shard, WorkerNode *targetNode);
extern void VerifyTablesHaveReplicaIdentity(List *colocatedTableList);
extern bool RelationCanPublishAllModifications(Oid relationId);
extern void UpdatePlacementUpdateStatusForShardIntervalList(List *shardIntervalList,