From 0629766e575b39be5521be87f07c739c5d7bbe9a Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 11 Nov 2025 13:52:30 +0300 Subject: [PATCH] allow supported forms of "create table" variants from any node - create table - create table as - create table partition of - create table partitioned by --- .../distributed/commands/alter_table.c | 4 +- .../commands/create_distributed_table.c | 54 ++++-- .../commands/schema_based_sharding.c | 25 +-- src/backend/distributed/commands/table.c | 4 +- .../distributed/metadata/metadata_sync.c | 68 +++---- .../distributed/metadata/metadata_utility.c | 2 +- .../distributed/operations/node_protocol.c | 180 +++++++++++++++++- src/include/distributed/metadata_sync.h | 2 +- src/include/distributed/metadata_utility.h | 3 +- .../expected/schema_based_sharding.out | 12 +- .../regress/sql/schema_based_sharding.sql | 13 +- 11 files changed, 271 insertions(+), 96 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index d2f8348da..978c868b5 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -1524,7 +1524,9 @@ CreateCitusTableLike(TableConversionState *con) .colocateWithTableName = quote_qualified_identifier(con->schemaName, con->relationName) }; - CreateSingleShardTable(con->newRelationId, colocationParam); + bool allowFromWorkersIfPostgresTable = false; + CreateSingleShardTable(con->newRelationId, colocationParam, + allowFromWorkersIfPostgresTable); } else { diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 38fadb0f3..c8456bc27 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -142,7 +142,8 @@ static CitusTableParams DecideCitusTableParams(CitusTableType tableType, DistributedTableParams * distributedTableParams); static void CreateCitusTable(Oid relationId, CitusTableType tableType, - DistributedTableParams *distributedTableParams); + DistributedTableParams *distributedTableParams, + bool allowFromWorkers); static void ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, DistributedTableParams * @@ -163,7 +164,7 @@ static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod); static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod); -static void EnsureCitusTableCanBeCreated(Oid relationOid); +static void EnsureCitusTableCanBeCreated(Oid relationOid, bool allowFromWorkers); static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId); static void EnsureDistributedSequencesHaveOneType(Oid relationId, List *seqInfoList); @@ -302,7 +303,9 @@ create_distributed_table(PG_FUNCTION_ARGS) .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocateWithTableName = colocateWithTableName, }; - CreateSingleShardTable(relationId, colocationParam); + bool allowFromWorkersIfPostgresTable = false; + CreateSingleShardTable(relationId, colocationParam, + allowFromWorkersIfPostgresTable); } PG_RETURN_VOID(); @@ -416,7 +419,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, DropOrphanedResourcesInSeparateTransaction(); - EnsureCitusTableCanBeCreated(relationId); + bool allowFromWorkers = false; + EnsureCitusTableCanBeCreated(relationId, allowFromWorkers); EnsureValidDistributionColumn(relationId, distributionColumnName); @@ -938,15 +942,23 @@ create_reference_table(PG_FUNCTION_ARGS) /* * EnsureCitusTableCanBeCreated checks if - * - we are on the coordinator + * - we are on the coordinator if allowFromWorkers = false or else if we can ensure propagation to coordinator * - the current user is the owner of the table * - relation kind is supported * - relation is not a shard */ static void -EnsureCitusTableCanBeCreated(Oid relationOid) +EnsureCitusTableCanBeCreated(Oid relationOid, bool allowFromWorkers) { - EnsureCoordinator(); + if (allowFromWorkers) + { + EnsurePropagationToCoordinator(); + } + else + { + EnsureCoordinator(); + } + EnsureRelationExists(relationOid); EnsureTableOwner(relationOid); ErrorIfTemporaryTable(relationOid); @@ -1031,9 +1043,11 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, }, .shardCount = shardCount, .shardCountIsStrict = shardCountIsStrict, - .distributionColumnName = distributionColumnName + .distributionColumnName = distributionColumnName, }; - CreateCitusTable(relationId, tableType, &distributedTableParams); + + bool allowFromWorkers = false; + CreateCitusTable(relationId, tableType, &distributedTableParams, allowFromWorkers); } @@ -1053,7 +1067,8 @@ CreateReferenceTable(Oid relationId) } else { - CreateCitusTable(relationId, REFERENCE_TABLE, NULL); + bool allowFromWorkers = false; + CreateCitusTable(relationId, REFERENCE_TABLE, NULL, allowFromWorkers); } } @@ -1063,13 +1078,14 @@ CreateReferenceTable(Oid relationId) * doesn't have a shard key. */ void -CreateSingleShardTable(Oid relationId, ColocationParam colocationParam) +CreateSingleShardTable(Oid relationId, ColocationParam colocationParam, + bool allowFromWorkersIfPostgresTable) { DistributedTableParams distributedTableParams = { .colocationParam = colocationParam, .shardCount = 1, .shardCountIsStrict = true, - .distributionColumnName = NULL + .distributionColumnName = NULL, }; if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) @@ -1084,7 +1100,8 @@ CreateSingleShardTable(Oid relationId, ColocationParam colocationParam) } else { - CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams); + CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams, + allowFromWorkersIfPostgresTable); } } @@ -1104,7 +1121,8 @@ CreateSingleShardTable(Oid relationId, ColocationParam colocationParam) */ static void CreateCitusTable(Oid relationId, CitusTableType tableType, - DistributedTableParams *distributedTableParams) + DistributedTableParams *distributedTableParams, + bool allowFromWorkers) { if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED || tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) != @@ -1115,7 +1133,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, "not be otherwise"))); } - EnsureCitusTableCanBeCreated(relationId); + EnsureCitusTableCanBeCreated(relationId, allowFromWorkers); /* allow creating a Citus table on an empty cluster */ InsertCoordinatorIfClusterEmpty(); @@ -1349,7 +1367,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType, .distributionColumnName = distributedTableParams->distributionColumnName, }; CreateCitusTable(partitionRelationId, tableType, - &childDistributedTableParams); + &childDistributedTableParams, + allowFromWorkers); } MemoryContextSwitchTo(oldContext); @@ -1407,7 +1426,8 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, "not be otherwise"))); } - EnsureCitusTableCanBeCreated(relationId); + bool allowFromWorkers = false; + EnsureCitusTableCanBeCreated(relationId, allowFromWorkers); Relation relation = try_relation_open(relationId, ExclusiveLock); if (relation == NULL) diff --git a/src/backend/distributed/commands/schema_based_sharding.c b/src/backend/distributed/commands/schema_based_sharding.c index 57c119b76..05b7260c4 100644 --- a/src/backend/distributed/commands/schema_based_sharding.c +++ b/src/backend/distributed/commands/schema_based_sharding.c @@ -265,23 +265,7 @@ EnsureFKeysForTenantTable(Oid relationId) void CreateTenantSchemaTable(Oid relationId) { - if (!IsCoordinator()) - { - /* - * We don't support creating tenant tables from workers. We could - * let ShouldCreateTenantSchemaTable() to return false to allow users - * to create a local table as usual but that would be confusing because - * it might sound like we allow creating tenant tables from workers. - * For this reason, we prefer to throw an error instead. - * - * Indeed, CreateSingleShardTable() would already do so but we - * prefer to throw an error with a more meaningful message, rather - * than saying "operation is not allowed on this node". - */ - ereport(ERROR, (errmsg("cannot create tables in a distributed schema from " - "a worker node"), - errhint("Connect to the coordinator node and try again."))); - } + EnsurePropagationToCoordinator(); EnsureTableKindSupportedForTenantSchema(relationId); @@ -301,7 +285,8 @@ CreateTenantSchemaTable(Oid relationId) .colocationParamType = COLOCATE_WITH_COLOCATION_ID, .colocationId = colocationId, }; - CreateSingleShardTable(relationId, colocationParam); + bool allowFromWorkersIfPostgresTable = true; + CreateSingleShardTable(relationId, colocationParam, allowFromWorkersIfPostgresTable); } @@ -696,7 +681,9 @@ citus_schema_distribute(PG_FUNCTION_ARGS) originalForeignKeyRecreationCommands, fkeyCommandsForRelation); DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES); - CreateSingleShardTable(relationId, colocationParam); + bool allowFromWorkersIfPostgresTable = false; /* TODOTASK: should we allow? */ + CreateSingleShardTable(relationId, colocationParam, + allowFromWorkersIfPostgresTable); } /* We can skip foreign key validations as we are sure about them at start */ diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index eaa8b1031..23e729f0b 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -666,7 +666,9 @@ DistributePartitionUsingParent(Oid parentCitusRelationId, Oid partitionRelationI .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocateWithTableName = parentRelationName, }; - CreateSingleShardTable(partitionRelationId, colocationParam); + bool allowFromWorkersIfPostgresTable = false; + CreateSingleShardTable(partitionRelationId, colocationParam, + allowFromWorkersIfPostgresTable); return; } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 892a784b9..496e74fae 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -105,9 +105,9 @@ static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress); static char * SchemaOwnerName(Oid objectId); static bool HasMetadataWorkers(void); -static void CreateShellTableOnWorkers(Oid relationId); -static void CreateTableMetadataOnWorkers(Oid relationId); -static void CreateDependingViewsOnWorkers(Oid relationId); +static void CreateShellTableOnRemoteNodes(Oid relationId); +static void CreateTableMetadataOnRemoteNodes(Oid relationId); +static void CreateDependingViewsOnRemoteNodes(Oid relationId); static void AddTableToPublications(Oid relationId); static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void); static bool ShouldSyncTableMetadataInternal(bool hashDistributed, @@ -264,19 +264,19 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) /* - * SyncCitusTableMetadata syncs citus table metadata to worker nodes with metadata. + * SyncCitusTableMetadata syncs citus table metadata to remote nodes with metadata. * Our definition of metadata includes the shell table and its inter relations with * other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard * and pg_dist_shard placement entries. This function also propagates the views that - * depend on the given relation, to the metadata workers, and adds the relation to + * depend on the given relation, to the remote metadata nodes, and adds the relation to * the appropriate publications. */ void SyncCitusTableMetadata(Oid relationId) { - CreateShellTableOnWorkers(relationId); - CreateTableMetadataOnWorkers(relationId); - CreateInterTableRelationshipOfRelationOnWorkers(relationId); + CreateShellTableOnRemoteNodes(relationId); + CreateTableMetadataOnRemoteNodes(relationId); + CreateInterTableRelationshipOfRelationOnRemoteNodes(relationId); if (!IsTableOwnedByExtension(relationId)) { @@ -285,17 +285,17 @@ SyncCitusTableMetadata(Oid relationId) MarkObjectDistributed(&relationAddress); } - CreateDependingViewsOnWorkers(relationId); + CreateDependingViewsOnRemoteNodes(relationId); AddTableToPublications(relationId); } /* - * CreateDependingViewsOnWorkers takes a relationId and creates the views that depend on - * that relation on workers with metadata. Propagated views are marked as distributed. + * CreateDependingViewsOnRemoteNodes takes a relationId and creates the views that depend on + * that relation on remote nodes with metadata. Propagated views are marked as distributed. */ static void -CreateDependingViewsOnWorkers(Oid relationId) +CreateDependingViewsOnRemoteNodes(Oid relationId) { List *views = GetDependingViews(relationId); @@ -305,7 +305,7 @@ CreateDependingViewsOnWorkers(Oid relationId) return; } - SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION); Oid viewOid = InvalidOid; foreach_declared_oid(viewOid, views) @@ -322,18 +322,18 @@ CreateDependingViewsOnWorkers(Oid relationId) char *createViewCommand = CreateViewDDLCommand(viewOid); char *alterViewOwnerCommand = AlterViewOwnerCommand(viewOid); - SendCommandToWorkersWithMetadata(createViewCommand); - SendCommandToWorkersWithMetadata(alterViewOwnerCommand); + SendCommandToRemoteNodesWithMetadata(createViewCommand); + SendCommandToRemoteNodesWithMetadata(alterViewOwnerCommand); MarkObjectDistributed(viewAddress); } - SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); + SendCommandToRemoteNodesWithMetadata(ENABLE_DDL_PROPAGATION); } /* - * AddTableToPublications adds the table to a publication on workers with metadata. + * AddTableToPublications adds the table to a publication on remote nodes with metadata. */ static void AddTableToPublications(Oid relationId) @@ -346,7 +346,7 @@ AddTableToPublications(Oid relationId) Oid publicationId = InvalidOid; - SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION); foreach_declared_oid(publicationId, publicationIds) { @@ -368,10 +368,10 @@ AddTableToPublications(Oid relationId) GetAlterPublicationTableDDLCommand(publicationId, relationId, isAdd); /* send ALTER PUBLICATION .. ADD to workers with metadata */ - SendCommandToWorkersWithMetadata(alterPublicationCommand); + SendCommandToRemoteNodesWithMetadata(alterPublicationCommand); } - SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); + SendCommandToRemoteNodesWithMetadata(ENABLE_DDL_PROPAGATION); } @@ -2752,11 +2752,11 @@ HasMetadataWorkers(void) /* - * CreateInterTableRelationshipOfRelationOnWorkers create inter table relationship - * for the the given relation id on each worker node with metadata. + * CreateInterTableRelationshipOfRelationOnRemoteNodes create inter table relationship + * for the the given relation id on each remote node with metadata. */ void -CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId) +CreateInterTableRelationshipOfRelationOnRemoteNodes(Oid relationId) { /* if the table is owned by an extension we don't create */ bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); @@ -2769,12 +2769,12 @@ CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId) InterTableRelationshipOfRelationCommandList(relationId); /* prevent recursive propagation */ - SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION); const char *command = NULL; foreach_declared_ptr(command, commandList) { - SendCommandToWorkersWithMetadata(command); + SendCommandToRemoteNodesWithMetadata(command); } } @@ -2802,11 +2802,11 @@ InterTableRelationshipOfRelationCommandList(Oid relationId) /* - * CreateShellTableOnWorkers creates the shell table on each worker node with metadata + * CreateShellTableOnRemoteNodes creates the shell table on each remote node with metadata * including sequence dependency and truncate triggers. */ static void -CreateShellTableOnWorkers(Oid relationId) +CreateShellTableOnRemoteNodes(Oid relationId) { if (IsTableOwnedByExtension(relationId)) { @@ -2834,31 +2834,31 @@ CreateShellTableOnWorkers(Oid relationId) const char *command = NULL; foreach_declared_ptr(command, commandList) { - SendCommandToWorkersWithMetadata(command); + SendCommandToRemoteNodesWithMetadata(command); } } /* - * CreateTableMetadataOnWorkers creates the list of commands needed to create the - * metadata of the given distributed table and sends these commands to all metadata - * workers i.e. workers with hasmetadata=true. Before sending the commands, in order + * CreateTableMetadataOnRemoteNodes creates the list of commands needed to + * create the metadata of the given distributed table and sends these commands to all + * remote metadata nodes i.e. hasmetadata=true. Before sending the commands, in order * to prevent recursive propagation, DDL propagation on workers are disabled with a * `SET citus.enable_ddl_propagation TO off;` command. */ static void -CreateTableMetadataOnWorkers(Oid relationId) +CreateTableMetadataOnRemoteNodes(Oid relationId) { List *commandList = CitusTableMetadataCreateCommandList(relationId); /* prevent recursive propagation */ - SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION); /* send the commands one by one */ const char *command = NULL; foreach_declared_ptr(command, commandList) { - SendCommandToWorkersWithMetadata(command); + SendCommandToRemoteNodesWithMetadata(command); } } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 1fb3d6fd0..72196dfef 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1885,7 +1885,7 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, if (placementId == INVALID_PLACEMENT_ID) { - placementId = master_get_new_placementid(NULL); + placementId = GetNextPlacementId(); } values[Anum_pg_dist_placement_placementid - 1] = Int64GetDatum(placementId); values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId); diff --git a/src/backend/distributed/operations/node_protocol.c b/src/backend/distributed/operations/node_protocol.c index 680bda22f..49dfb6678 100644 --- a/src/backend/distributed/operations/node_protocol.c +++ b/src/backend/distributed/operations/node_protocol.c @@ -60,10 +60,12 @@ #include "distributed/coordinator_protocol.h" #include "distributed/deparser.h" #include "distributed/listutils.h" +#include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/namespace_utils.h" #include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" #include "distributed/shared_library_init.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" @@ -74,6 +76,10 @@ int ShardReplicationFactor = 1; /* desired replication factor for shards */ int NextShardId = 0; int NextPlacementId = 0; +static int64 GetNextShardIdFromNode(WorkerNode *node); +static uint64 GetNextShardIdInternal(void); +static int64 GetNextPlacementIdFromNode(WorkerNode *node); +static uint64 GetNextPlacementIdInternal(void); static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_pg_index indexForm, List ** @@ -188,7 +194,84 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) /* - * master_get_new_shardid is a user facing wrapper function around GetNextShardId() + * GetNextShardId retrieves the next shard id either from the local + * node if it's the coordinator or retrieves it from the coordinator otherwise. + * + * Throws an error for the latter case if the coordinator is not in metadata. + */ +uint64 +GetNextShardId(void) +{ + uint64 shardId = INVALID_SHARD_ID; + if (IsCoordinator()) + { + shardId = GetNextShardIdInternal(); + } + else + { + /* + * If we're not on the coordinator, retrieve the next id from the + * coordinator node. Although all nodes have the sequence, we don't + * synchronize the sequences that are part of the Citus metadata + * across nodes, so we need to get the next value from the + * coordinator. + * + * Note that before this point, we should have already verified + * that coordinator is added into the metadata. + */ + WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError(); + shardId = GetNextShardIdFromNode(coordinator); + } + + return shardId; +} + + +/* + * GetNextShardIdFromNode gets the next shard id from given + * node by calling pg_catalog.master_get_new_shardid() function. + */ +static int64 +GetNextShardIdFromNode(WorkerNode *node) +{ + const char *nodeName = node->workerName; + int nodePort = node->workerPort; + uint32 connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + + int querySent = SendRemoteCommand(connection, + "SELECT pg_catalog.master_get_new_shardid();"); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + if (rowCount != 1 || colCount != 1) + { + ereport(ERROR, (errmsg("unexpected result from the node when getting " + "next shard id"))); + } + + int64 shardId = ParseIntField(result, 0, 0); + + PQclear(result); + ForgetResults(connection); + + return shardId; +} + + +/* + * master_get_new_shardid is a user facing wrapper function around GetNextShardIdInternal() * which allocates and returns a unique shardId for the shard to be created. * * NB: This can be called by any user; for now we have decided that that's @@ -201,7 +284,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); EnsureCoordinator(); - uint64 shardId = GetNextShardId(); + uint64 shardId = GetNextShardIdInternal(); Datum shardIdDatum = Int64GetDatum(shardId); PG_RETURN_DATUM(shardIdDatum); @@ -209,15 +292,15 @@ master_get_new_shardid(PG_FUNCTION_ARGS) /* - * GetNextShardId allocates and returns a unique shardId for the shard to be + * GetNextShardIdInternal allocates and returns a unique shardId for the shard to be * created. This allocation occurs both in shared memory and in write ahead * logs; writing to logs avoids the risk of having shardId collisions. * * Please note that the caller is still responsible for finalizing shard data * and the shardId with the master node. */ -uint64 -GetNextShardId() +static uint64 +GetNextShardIdInternal(void) { Oid savedUserId = InvalidOid; int savedSecurityContext = 0; @@ -256,9 +339,86 @@ GetNextShardId() } +/* + * GetNextPlacementId retrieves the next placement id either from the local + * node if it's the coordinator or retrieves it from the coordinator otherwise. + * + * Throws an error for the latter case if the coordinator is not in metadata. + */ +uint64 +GetNextPlacementId(void) +{ + uint64 placementId = INVALID_PLACEMENT_ID; + if (IsCoordinator()) + { + placementId = GetNextPlacementIdInternal(); + } + else + { + /* + * If we're not on the coordinator, retrieve the next id from the + * coordinator node. Although all nodes have the sequence, we don't + * synchronize the sequences that are part of the Citus metadata + * across nodes, so we need to get the next value from the + * coordinator. + * + * Note that before this point, we should have already verified + * that coordinator is added into the metadata. + */ + WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError(); + placementId = GetNextPlacementIdFromNode(coordinator); + } + + return placementId; +} + + +/* + * GetNextPlacementIdFromNode gets the next placement id from given + * node by calling pg_catalog.master_get_new_placementid() function. + */ +static int64 +GetNextPlacementIdFromNode(WorkerNode *node) +{ + const char *nodeName = node->workerName; + int nodePort = node->workerPort; + uint32 connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + + int querySent = SendRemoteCommand(connection, + "SELECT pg_catalog.master_get_new_placementid();"); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + if (rowCount != 1 || colCount != 1) + { + ereport(ERROR, (errmsg("unexpected result from the node when getting " + "next placement id"))); + } + + int64 placementId = ParseIntField(result, 0, 0); + + PQclear(result); + ForgetResults(connection); + + return placementId; +} + + /* * master_get_new_placementid is a user facing wrapper function around - * GetNextPlacementId() which allocates and returns a unique placement id for the + * GetNextPlacementIdInternal() which allocates and returns a unique placement id for the * placement to be created. * * NB: This can be called by any user; for now we have decided that that's @@ -271,7 +431,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); EnsureCoordinator(); - uint64 placementId = GetNextPlacementId(); + uint64 placementId = GetNextPlacementIdInternal(); Datum placementIdDatum = Int64GetDatum(placementId); PG_RETURN_DATUM(placementIdDatum); @@ -279,7 +439,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS) /* - * GetNextPlacementId allocates and returns a unique placementId for + * GetNextPlacementIdInternal allocates and returns a unique placementId for * the placement to be created. This allocation occurs both in shared memory * and in write ahead logs; writing to logs avoids the risk of having placementId * collisions. @@ -288,8 +448,8 @@ master_get_new_placementid(PG_FUNCTION_ARGS) * ok. We might want to restrict this to users part of a specific role or such * at some later point. */ -uint64 -GetNextPlacementId(void) +static uint64 +GetNextPlacementIdInternal(void) { Oid savedUserId = InvalidOid; int savedSecurityContext = 0; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 29583f01f..6ab8b3937 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -117,7 +117,7 @@ extern List * GenerateGrantOnFDWQueriesFromAclItem(Oid serverId, AclItem *aclIte extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, uint64 shardLength, int32 groupId); extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId); -extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId); +extern void CreateInterTableRelationshipOfRelationOnRemoteNodes(Oid relationId); extern List * InterTableRelationshipOfRelationCommandList(Oid relationId); extern List * DetachPartitionCommandList(void); extern void SyncNodeMetadataToNodes(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index b72e2d396..087bbcb2e 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -376,7 +376,8 @@ extern void DeleteShardRow(uint64 shardId); extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void DeleteShardPlacementRowGlobally(uint64 placementId); extern void DeleteShardPlacementRow(uint64 placementId); -extern void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam); +extern void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam, + bool allowFromWorkersIfPostgresTable); extern void CreateDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, int shardCount, bool shardCountIsStrict, char *colocateWithTableName); diff --git a/src/test/regress/expected/schema_based_sharding.out b/src/test/regress/expected/schema_based_sharding.out index 7e31cb597..a0907145c 100644 --- a/src/test/regress/expected/schema_based_sharding.out +++ b/src/test/regress/expected/schema_based_sharding.out @@ -1483,15 +1483,13 @@ REVOKE CREATE ON DATABASE regression FROM test_non_super_user; REVOKE CREATE ON SCHEMA public FROM test_non_super_user; DROP ROLE test_non_super_user; \c - - - :worker_1_port --- test creating a tenant table from workers -CREATE TABLE tenant_3.tbl_1(a int, b text); -ERROR: cannot create tables in a distributed schema from a worker node -HINT: Connect to the coordinator node and try again. -- test creating a tenant schema from workers SET citus.enable_schema_based_sharding TO ON; CREATE SCHEMA worker_tenant_schema; -DROP SCHEMA worker_tenant_schema; -SET citus.enable_schema_based_sharding TO OFF; +-- test creating a tenant table from workers +SET citus.shard_replication_factor TO 1; +CREATE TABLE worker_tenant_schema.tbl_1(a int, b text); +RESET citus.shard_replication_factor; -- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE -- commands that we send to workers don't recursively try creating a -- tenant schema / table. @@ -1516,6 +1514,8 @@ SELECT pg_reload_conf(); SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3'); ERROR: schema is expected to be already dropped because this function is only expected to be called from Citus drop hook \c - - - :master_port +SET client_min_messages TO WARNING; +DROP SCHEMA worker_tenant_schema CASCADE; SET search_path TO regular_schema; SET citus.next_shard_id TO 1950000; SET citus.shard_count TO 32; diff --git a/src/test/regress/sql/schema_based_sharding.sql b/src/test/regress/sql/schema_based_sharding.sql index ad437e7d8..3754ae8e6 100644 --- a/src/test/regress/sql/schema_based_sharding.sql +++ b/src/test/regress/sql/schema_based_sharding.sql @@ -1003,14 +1003,14 @@ DROP ROLE test_non_super_user; \c - - - :worker_1_port --- test creating a tenant table from workers -CREATE TABLE tenant_3.tbl_1(a int, b text); - -- test creating a tenant schema from workers SET citus.enable_schema_based_sharding TO ON; CREATE SCHEMA worker_tenant_schema; -DROP SCHEMA worker_tenant_schema; -SET citus.enable_schema_based_sharding TO OFF; + +-- test creating a tenant table from workers +SET citus.shard_replication_factor TO 1; +CREATE TABLE worker_tenant_schema.tbl_1(a int, b text); +RESET citus.shard_replication_factor; -- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE -- commands that we send to workers don't recursively try creating a @@ -1030,6 +1030,9 @@ SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace \c - - - :master_port +SET client_min_messages TO WARNING; +DROP SCHEMA worker_tenant_schema CASCADE; + SET search_path TO regular_schema; SET citus.next_shard_id TO 1950000; SET citus.shard_count TO 32;