allow supported forms of "create table" variants from any node

- create table
- create table as
- create table partition of
- create table partitioned by
ddl-from-any-node-phase-1
Onur Tirtir 2025-11-11 13:52:30 +03:00
parent 578b996401
commit 0629766e57
11 changed files with 271 additions and 96 deletions

View File

@ -1524,7 +1524,9 @@ CreateCitusTableLike(TableConversionState *con)
.colocateWithTableName = quote_qualified_identifier(con->schemaName, .colocateWithTableName = quote_qualified_identifier(con->schemaName,
con->relationName) con->relationName)
}; };
CreateSingleShardTable(con->newRelationId, colocationParam); bool allowFromWorkersIfPostgresTable = false;
CreateSingleShardTable(con->newRelationId, colocationParam,
allowFromWorkersIfPostgresTable);
} }
else else
{ {

View File

@ -142,7 +142,8 @@ static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
DistributedTableParams * DistributedTableParams *
distributedTableParams); distributedTableParams);
static void CreateCitusTable(Oid relationId, CitusTableType tableType, static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams); DistributedTableParams *distributedTableParams,
bool allowFromWorkers);
static void ConvertCitusLocalTableToTableType(Oid relationId, static void ConvertCitusLocalTableToTableType(Oid relationId,
CitusTableType tableType, CitusTableType tableType,
DistributedTableParams * DistributedTableParams *
@ -163,7 +164,7 @@ static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber); int16 supportFunctionNumber);
static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod); static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod);
static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod); static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod);
static void EnsureCitusTableCanBeCreated(Oid relationOid); static void EnsureCitusTableCanBeCreated(Oid relationOid, bool allowFromWorkers);
static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId); static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId);
static void EnsureDistributedSequencesHaveOneType(Oid relationId, static void EnsureDistributedSequencesHaveOneType(Oid relationId,
List *seqInfoList); List *seqInfoList);
@ -302,7 +303,9 @@ create_distributed_table(PG_FUNCTION_ARGS)
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
.colocateWithTableName = colocateWithTableName, .colocateWithTableName = colocateWithTableName,
}; };
CreateSingleShardTable(relationId, colocationParam); bool allowFromWorkersIfPostgresTable = false;
CreateSingleShardTable(relationId, colocationParam,
allowFromWorkersIfPostgresTable);
} }
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -416,7 +419,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
DropOrphanedResourcesInSeparateTransaction(); DropOrphanedResourcesInSeparateTransaction();
EnsureCitusTableCanBeCreated(relationId); bool allowFromWorkers = false;
EnsureCitusTableCanBeCreated(relationId, allowFromWorkers);
EnsureValidDistributionColumn(relationId, distributionColumnName); EnsureValidDistributionColumn(relationId, distributionColumnName);
@ -938,15 +942,23 @@ create_reference_table(PG_FUNCTION_ARGS)
/* /*
* EnsureCitusTableCanBeCreated checks if * EnsureCitusTableCanBeCreated checks if
* - we are on the coordinator * - we are on the coordinator if allowFromWorkers = false or else if we can ensure propagation to coordinator
* - the current user is the owner of the table * - the current user is the owner of the table
* - relation kind is supported * - relation kind is supported
* - relation is not a shard * - relation is not a shard
*/ */
static void static void
EnsureCitusTableCanBeCreated(Oid relationOid) EnsureCitusTableCanBeCreated(Oid relationOid, bool allowFromWorkers)
{ {
EnsureCoordinator(); if (allowFromWorkers)
{
EnsurePropagationToCoordinator();
}
else
{
EnsureCoordinator();
}
EnsureRelationExists(relationOid); EnsureRelationExists(relationOid);
EnsureTableOwner(relationOid); EnsureTableOwner(relationOid);
ErrorIfTemporaryTable(relationOid); ErrorIfTemporaryTable(relationOid);
@ -1031,9 +1043,11 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
}, },
.shardCount = shardCount, .shardCount = shardCount,
.shardCountIsStrict = shardCountIsStrict, .shardCountIsStrict = shardCountIsStrict,
.distributionColumnName = distributionColumnName .distributionColumnName = distributionColumnName,
}; };
CreateCitusTable(relationId, tableType, &distributedTableParams);
bool allowFromWorkers = false;
CreateCitusTable(relationId, tableType, &distributedTableParams, allowFromWorkers);
} }
@ -1053,7 +1067,8 @@ CreateReferenceTable(Oid relationId)
} }
else else
{ {
CreateCitusTable(relationId, REFERENCE_TABLE, NULL); bool allowFromWorkers = false;
CreateCitusTable(relationId, REFERENCE_TABLE, NULL, allowFromWorkers);
} }
} }
@ -1063,13 +1078,14 @@ CreateReferenceTable(Oid relationId)
* doesn't have a shard key. * doesn't have a shard key.
*/ */
void void
CreateSingleShardTable(Oid relationId, ColocationParam colocationParam) CreateSingleShardTable(Oid relationId, ColocationParam colocationParam,
bool allowFromWorkersIfPostgresTable)
{ {
DistributedTableParams distributedTableParams = { DistributedTableParams distributedTableParams = {
.colocationParam = colocationParam, .colocationParam = colocationParam,
.shardCount = 1, .shardCount = 1,
.shardCountIsStrict = true, .shardCountIsStrict = true,
.distributionColumnName = NULL .distributionColumnName = NULL,
}; };
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
@ -1084,7 +1100,8 @@ CreateSingleShardTable(Oid relationId, ColocationParam colocationParam)
} }
else else
{ {
CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams); CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams,
allowFromWorkersIfPostgresTable);
} }
} }
@ -1104,7 +1121,8 @@ CreateSingleShardTable(Oid relationId, ColocationParam colocationParam)
*/ */
static void static void
CreateCitusTable(Oid relationId, CitusTableType tableType, CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams) DistributedTableParams *distributedTableParams,
bool allowFromWorkers)
{ {
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED || if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) != tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) !=
@ -1115,7 +1133,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
"not be otherwise"))); "not be otherwise")));
} }
EnsureCitusTableCanBeCreated(relationId); EnsureCitusTableCanBeCreated(relationId, allowFromWorkers);
/* allow creating a Citus table on an empty cluster */ /* allow creating a Citus table on an empty cluster */
InsertCoordinatorIfClusterEmpty(); InsertCoordinatorIfClusterEmpty();
@ -1349,7 +1367,8 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
.distributionColumnName = distributedTableParams->distributionColumnName, .distributionColumnName = distributedTableParams->distributionColumnName,
}; };
CreateCitusTable(partitionRelationId, tableType, CreateCitusTable(partitionRelationId, tableType,
&childDistributedTableParams); &childDistributedTableParams,
allowFromWorkers);
} }
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -1407,7 +1426,8 @@ ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
"not be otherwise"))); "not be otherwise")));
} }
EnsureCitusTableCanBeCreated(relationId); bool allowFromWorkers = false;
EnsureCitusTableCanBeCreated(relationId, allowFromWorkers);
Relation relation = try_relation_open(relationId, ExclusiveLock); Relation relation = try_relation_open(relationId, ExclusiveLock);
if (relation == NULL) if (relation == NULL)

View File

@ -265,23 +265,7 @@ EnsureFKeysForTenantTable(Oid relationId)
void void
CreateTenantSchemaTable(Oid relationId) CreateTenantSchemaTable(Oid relationId)
{ {
if (!IsCoordinator()) EnsurePropagationToCoordinator();
{
/*
* We don't support creating tenant tables from workers. We could
* let ShouldCreateTenantSchemaTable() to return false to allow users
* to create a local table as usual but that would be confusing because
* it might sound like we allow creating tenant tables from workers.
* For this reason, we prefer to throw an error instead.
*
* Indeed, CreateSingleShardTable() would already do so but we
* prefer to throw an error with a more meaningful message, rather
* than saying "operation is not allowed on this node".
*/
ereport(ERROR, (errmsg("cannot create tables in a distributed schema from "
"a worker node"),
errhint("Connect to the coordinator node and try again.")));
}
EnsureTableKindSupportedForTenantSchema(relationId); EnsureTableKindSupportedForTenantSchema(relationId);
@ -301,7 +285,8 @@ CreateTenantSchemaTable(Oid relationId)
.colocationParamType = COLOCATE_WITH_COLOCATION_ID, .colocationParamType = COLOCATE_WITH_COLOCATION_ID,
.colocationId = colocationId, .colocationId = colocationId,
}; };
CreateSingleShardTable(relationId, colocationParam); bool allowFromWorkersIfPostgresTable = true;
CreateSingleShardTable(relationId, colocationParam, allowFromWorkersIfPostgresTable);
} }
@ -696,7 +681,9 @@ citus_schema_distribute(PG_FUNCTION_ARGS)
originalForeignKeyRecreationCommands, fkeyCommandsForRelation); originalForeignKeyRecreationCommands, fkeyCommandsForRelation);
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES); DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES);
CreateSingleShardTable(relationId, colocationParam); bool allowFromWorkersIfPostgresTable = false; /* TODOTASK: should we allow? */
CreateSingleShardTable(relationId, colocationParam,
allowFromWorkersIfPostgresTable);
} }
/* We can skip foreign key validations as we are sure about them at start */ /* We can skip foreign key validations as we are sure about them at start */

View File

@ -666,7 +666,9 @@ DistributePartitionUsingParent(Oid parentCitusRelationId, Oid partitionRelationI
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
.colocateWithTableName = parentRelationName, .colocateWithTableName = parentRelationName,
}; };
CreateSingleShardTable(partitionRelationId, colocationParam); bool allowFromWorkersIfPostgresTable = false;
CreateSingleShardTable(partitionRelationId, colocationParam,
allowFromWorkersIfPostgresTable);
return; return;
} }

View File

@ -105,9 +105,9 @@ static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress); static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress);
static char * SchemaOwnerName(Oid objectId); static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void); static bool HasMetadataWorkers(void);
static void CreateShellTableOnWorkers(Oid relationId); static void CreateShellTableOnRemoteNodes(Oid relationId);
static void CreateTableMetadataOnWorkers(Oid relationId); static void CreateTableMetadataOnRemoteNodes(Oid relationId);
static void CreateDependingViewsOnWorkers(Oid relationId); static void CreateDependingViewsOnRemoteNodes(Oid relationId);
static void AddTableToPublications(Oid relationId); static void AddTableToPublications(Oid relationId);
static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void); static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed, static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
@ -264,19 +264,19 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS)
/* /*
* SyncCitusTableMetadata syncs citus table metadata to worker nodes with metadata. * SyncCitusTableMetadata syncs citus table metadata to remote nodes with metadata.
* Our definition of metadata includes the shell table and its inter relations with * Our definition of metadata includes the shell table and its inter relations with
* other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard * other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard
* and pg_dist_shard placement entries. This function also propagates the views that * and pg_dist_shard placement entries. This function also propagates the views that
* depend on the given relation, to the metadata workers, and adds the relation to * depend on the given relation, to the remote metadata nodes, and adds the relation to
* the appropriate publications. * the appropriate publications.
*/ */
void void
SyncCitusTableMetadata(Oid relationId) SyncCitusTableMetadata(Oid relationId)
{ {
CreateShellTableOnWorkers(relationId); CreateShellTableOnRemoteNodes(relationId);
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnRemoteNodes(relationId);
CreateInterTableRelationshipOfRelationOnWorkers(relationId); CreateInterTableRelationshipOfRelationOnRemoteNodes(relationId);
if (!IsTableOwnedByExtension(relationId)) if (!IsTableOwnedByExtension(relationId))
{ {
@ -285,17 +285,17 @@ SyncCitusTableMetadata(Oid relationId)
MarkObjectDistributed(&relationAddress); MarkObjectDistributed(&relationAddress);
} }
CreateDependingViewsOnWorkers(relationId); CreateDependingViewsOnRemoteNodes(relationId);
AddTableToPublications(relationId); AddTableToPublications(relationId);
} }
/* /*
* CreateDependingViewsOnWorkers takes a relationId and creates the views that depend on * CreateDependingViewsOnRemoteNodes takes a relationId and creates the views that depend on
* that relation on workers with metadata. Propagated views are marked as distributed. * that relation on remote nodes with metadata. Propagated views are marked as distributed.
*/ */
static void static void
CreateDependingViewsOnWorkers(Oid relationId) CreateDependingViewsOnRemoteNodes(Oid relationId)
{ {
List *views = GetDependingViews(relationId); List *views = GetDependingViews(relationId);
@ -305,7 +305,7 @@ CreateDependingViewsOnWorkers(Oid relationId)
return; return;
} }
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION);
Oid viewOid = InvalidOid; Oid viewOid = InvalidOid;
foreach_declared_oid(viewOid, views) foreach_declared_oid(viewOid, views)
@ -322,18 +322,18 @@ CreateDependingViewsOnWorkers(Oid relationId)
char *createViewCommand = CreateViewDDLCommand(viewOid); char *createViewCommand = CreateViewDDLCommand(viewOid);
char *alterViewOwnerCommand = AlterViewOwnerCommand(viewOid); char *alterViewOwnerCommand = AlterViewOwnerCommand(viewOid);
SendCommandToWorkersWithMetadata(createViewCommand); SendCommandToRemoteNodesWithMetadata(createViewCommand);
SendCommandToWorkersWithMetadata(alterViewOwnerCommand); SendCommandToRemoteNodesWithMetadata(alterViewOwnerCommand);
MarkObjectDistributed(viewAddress); MarkObjectDistributed(viewAddress);
} }
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(ENABLE_DDL_PROPAGATION);
} }
/* /*
* AddTableToPublications adds the table to a publication on workers with metadata. * AddTableToPublications adds the table to a publication on remote nodes with metadata.
*/ */
static void static void
AddTableToPublications(Oid relationId) AddTableToPublications(Oid relationId)
@ -346,7 +346,7 @@ AddTableToPublications(Oid relationId)
Oid publicationId = InvalidOid; Oid publicationId = InvalidOid;
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION);
foreach_declared_oid(publicationId, publicationIds) foreach_declared_oid(publicationId, publicationIds)
{ {
@ -368,10 +368,10 @@ AddTableToPublications(Oid relationId)
GetAlterPublicationTableDDLCommand(publicationId, relationId, isAdd); GetAlterPublicationTableDDLCommand(publicationId, relationId, isAdd);
/* send ALTER PUBLICATION .. ADD to workers with metadata */ /* send ALTER PUBLICATION .. ADD to workers with metadata */
SendCommandToWorkersWithMetadata(alterPublicationCommand); SendCommandToRemoteNodesWithMetadata(alterPublicationCommand);
} }
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(ENABLE_DDL_PROPAGATION);
} }
@ -2752,11 +2752,11 @@ HasMetadataWorkers(void)
/* /*
* CreateInterTableRelationshipOfRelationOnWorkers create inter table relationship * CreateInterTableRelationshipOfRelationOnRemoteNodes create inter table relationship
* for the the given relation id on each worker node with metadata. * for the the given relation id on each remote node with metadata.
*/ */
void void
CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId) CreateInterTableRelationshipOfRelationOnRemoteNodes(Oid relationId)
{ {
/* if the table is owned by an extension we don't create */ /* if the table is owned by an extension we don't create */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
@ -2769,12 +2769,12 @@ CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId)
InterTableRelationshipOfRelationCommandList(relationId); InterTableRelationshipOfRelationCommandList(relationId);
/* prevent recursive propagation */ /* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION);
const char *command = NULL; const char *command = NULL;
foreach_declared_ptr(command, commandList) foreach_declared_ptr(command, commandList)
{ {
SendCommandToWorkersWithMetadata(command); SendCommandToRemoteNodesWithMetadata(command);
} }
} }
@ -2802,11 +2802,11 @@ InterTableRelationshipOfRelationCommandList(Oid relationId)
/* /*
* CreateShellTableOnWorkers creates the shell table on each worker node with metadata * CreateShellTableOnRemoteNodes creates the shell table on each remote node with metadata
* including sequence dependency and truncate triggers. * including sequence dependency and truncate triggers.
*/ */
static void static void
CreateShellTableOnWorkers(Oid relationId) CreateShellTableOnRemoteNodes(Oid relationId)
{ {
if (IsTableOwnedByExtension(relationId)) if (IsTableOwnedByExtension(relationId))
{ {
@ -2834,31 +2834,31 @@ CreateShellTableOnWorkers(Oid relationId)
const char *command = NULL; const char *command = NULL;
foreach_declared_ptr(command, commandList) foreach_declared_ptr(command, commandList)
{ {
SendCommandToWorkersWithMetadata(command); SendCommandToRemoteNodesWithMetadata(command);
} }
} }
/* /*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the * CreateTableMetadataOnRemoteNodes creates the list of commands needed to
* metadata of the given distributed table and sends these commands to all metadata * create the metadata of the given distributed table and sends these commands to all
* workers i.e. workers with hasmetadata=true. Before sending the commands, in order * remote metadata nodes i.e. hasmetadata=true. Before sending the commands, in order
* to prevent recursive propagation, DDL propagation on workers are disabled with a * to prevent recursive propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command. * `SET citus.enable_ddl_propagation TO off;` command.
*/ */
static void static void
CreateTableMetadataOnWorkers(Oid relationId) CreateTableMetadataOnRemoteNodes(Oid relationId)
{ {
List *commandList = CitusTableMetadataCreateCommandList(relationId); List *commandList = CitusTableMetadataCreateCommandList(relationId);
/* prevent recursive propagation */ /* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */ /* send the commands one by one */
const char *command = NULL; const char *command = NULL;
foreach_declared_ptr(command, commandList) foreach_declared_ptr(command, commandList)
{ {
SendCommandToWorkersWithMetadata(command); SendCommandToRemoteNodesWithMetadata(command);
} }
} }

View File

@ -1885,7 +1885,7 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
if (placementId == INVALID_PLACEMENT_ID) if (placementId == INVALID_PLACEMENT_ID)
{ {
placementId = master_get_new_placementid(NULL); placementId = GetNextPlacementId();
} }
values[Anum_pg_dist_placement_placementid - 1] = Int64GetDatum(placementId); values[Anum_pg_dist_placement_placementid - 1] = Int64GetDatum(placementId);
values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId); values[Anum_pg_dist_placement_shardid - 1] = Int64GetDatum(shardId);

View File

@ -60,10 +60,12 @@
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/deparser.h" #include "distributed/deparser.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/namespace_utils.h" #include "distributed/namespace_utils.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h" #include "distributed/shared_library_init.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -74,6 +76,10 @@ int ShardReplicationFactor = 1; /* desired replication factor for shards */
int NextShardId = 0; int NextShardId = 0;
int NextPlacementId = 0; int NextPlacementId = 0;
static int64 GetNextShardIdFromNode(WorkerNode *node);
static uint64 GetNextShardIdInternal(void);
static int64 GetNextPlacementIdFromNode(WorkerNode *node);
static uint64 GetNextPlacementIdInternal(void);
static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_pg_index static void GatherIndexAndConstraintDefinitionListExcludingReplicaIdentity(Form_pg_index
indexForm, indexForm,
List ** List **
@ -188,7 +194,84 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
/* /*
* master_get_new_shardid is a user facing wrapper function around GetNextShardId() * GetNextShardId retrieves the next shard id either from the local
* node if it's the coordinator or retrieves it from the coordinator otherwise.
*
* Throws an error for the latter case if the coordinator is not in metadata.
*/
uint64
GetNextShardId(void)
{
uint64 shardId = INVALID_SHARD_ID;
if (IsCoordinator())
{
shardId = GetNextShardIdInternal();
}
else
{
/*
* If we're not on the coordinator, retrieve the next id from the
* coordinator node. Although all nodes have the sequence, we don't
* synchronize the sequences that are part of the Citus metadata
* across nodes, so we need to get the next value from the
* coordinator.
*
* Note that before this point, we should have already verified
* that coordinator is added into the metadata.
*/
WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();
shardId = GetNextShardIdFromNode(coordinator);
}
return shardId;
}
/*
* GetNextShardIdFromNode gets the next shard id from given
* node by calling pg_catalog.master_get_new_shardid() function.
*/
static int64
GetNextShardIdFromNode(WorkerNode *node)
{
const char *nodeName = node->workerName;
int nodePort = node->workerPort;
uint32 connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
int querySent = SendRemoteCommand(connection,
"SELECT pg_catalog.master_get_new_shardid();");
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
if (rowCount != 1 || colCount != 1)
{
ereport(ERROR, (errmsg("unexpected result from the node when getting "
"next shard id")));
}
int64 shardId = ParseIntField(result, 0, 0);
PQclear(result);
ForgetResults(connection);
return shardId;
}
/*
* master_get_new_shardid is a user facing wrapper function around GetNextShardIdInternal()
* which allocates and returns a unique shardId for the shard to be created. * which allocates and returns a unique shardId for the shard to be created.
* *
* NB: This can be called by any user; for now we have decided that that's * NB: This can be called by any user; for now we have decided that that's
@ -201,7 +284,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
uint64 shardId = GetNextShardId(); uint64 shardId = GetNextShardIdInternal();
Datum shardIdDatum = Int64GetDatum(shardId); Datum shardIdDatum = Int64GetDatum(shardId);
PG_RETURN_DATUM(shardIdDatum); PG_RETURN_DATUM(shardIdDatum);
@ -209,15 +292,15 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
/* /*
* GetNextShardId allocates and returns a unique shardId for the shard to be * GetNextShardIdInternal allocates and returns a unique shardId for the shard to be
* created. This allocation occurs both in shared memory and in write ahead * created. This allocation occurs both in shared memory and in write ahead
* logs; writing to logs avoids the risk of having shardId collisions. * logs; writing to logs avoids the risk of having shardId collisions.
* *
* Please note that the caller is still responsible for finalizing shard data * Please note that the caller is still responsible for finalizing shard data
* and the shardId with the master node. * and the shardId with the master node.
*/ */
uint64 static uint64
GetNextShardId() GetNextShardIdInternal(void)
{ {
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
@ -256,9 +339,86 @@ GetNextShardId()
} }
/*
* GetNextPlacementId retrieves the next placement id either from the local
* node if it's the coordinator or retrieves it from the coordinator otherwise.
*
* Throws an error for the latter case if the coordinator is not in metadata.
*/
uint64
GetNextPlacementId(void)
{
uint64 placementId = INVALID_PLACEMENT_ID;
if (IsCoordinator())
{
placementId = GetNextPlacementIdInternal();
}
else
{
/*
* If we're not on the coordinator, retrieve the next id from the
* coordinator node. Although all nodes have the sequence, we don't
* synchronize the sequences that are part of the Citus metadata
* across nodes, so we need to get the next value from the
* coordinator.
*
* Note that before this point, we should have already verified
* that coordinator is added into the metadata.
*/
WorkerNode *coordinator = CoordinatorNodeIfAddedAsWorkerOrError();
placementId = GetNextPlacementIdFromNode(coordinator);
}
return placementId;
}
/*
* GetNextPlacementIdFromNode gets the next placement id from given
* node by calling pg_catalog.master_get_new_placementid() function.
*/
static int64
GetNextPlacementIdFromNode(WorkerNode *node)
{
const char *nodeName = node->workerName;
int nodePort = node->workerPort;
uint32 connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
int querySent = SendRemoteCommand(connection,
"SELECT pg_catalog.master_get_new_placementid();");
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
bool raiseInterrupts = true;
PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, ERROR);
}
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
if (rowCount != 1 || colCount != 1)
{
ereport(ERROR, (errmsg("unexpected result from the node when getting "
"next placement id")));
}
int64 placementId = ParseIntField(result, 0, 0);
PQclear(result);
ForgetResults(connection);
return placementId;
}
/* /*
* master_get_new_placementid is a user facing wrapper function around * master_get_new_placementid is a user facing wrapper function around
* GetNextPlacementId() which allocates and returns a unique placement id for the * GetNextPlacementIdInternal() which allocates and returns a unique placement id for the
* placement to be created. * placement to be created.
* *
* NB: This can be called by any user; for now we have decided that that's * NB: This can be called by any user; for now we have decided that that's
@ -271,7 +431,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
uint64 placementId = GetNextPlacementId(); uint64 placementId = GetNextPlacementIdInternal();
Datum placementIdDatum = Int64GetDatum(placementId); Datum placementIdDatum = Int64GetDatum(placementId);
PG_RETURN_DATUM(placementIdDatum); PG_RETURN_DATUM(placementIdDatum);
@ -279,7 +439,7 @@ master_get_new_placementid(PG_FUNCTION_ARGS)
/* /*
* GetNextPlacementId allocates and returns a unique placementId for * GetNextPlacementIdInternal allocates and returns a unique placementId for
* the placement to be created. This allocation occurs both in shared memory * the placement to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having placementId * and in write ahead logs; writing to logs avoids the risk of having placementId
* collisions. * collisions.
@ -288,8 +448,8 @@ master_get_new_placementid(PG_FUNCTION_ARGS)
* ok. We might want to restrict this to users part of a specific role or such * ok. We might want to restrict this to users part of a specific role or such
* at some later point. * at some later point.
*/ */
uint64 static uint64
GetNextPlacementId(void) GetNextPlacementIdInternal(void)
{ {
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;

View File

@ -117,7 +117,7 @@ extern List * GenerateGrantOnFDWQueriesFromAclItem(Oid serverId, AclItem *aclIte
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId); uint64 shardLength, int32 groupId);
extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId); extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId);
extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId); extern void CreateInterTableRelationshipOfRelationOnRemoteNodes(Oid relationId);
extern List * InterTableRelationshipOfRelationCommandList(Oid relationId); extern List * InterTableRelationshipOfRelationCommandList(Oid relationId);
extern List * DetachPartitionCommandList(void); extern List * DetachPartitionCommandList(void);
extern void SyncNodeMetadataToNodes(void); extern void SyncNodeMetadataToNodes(void);

View File

@ -376,7 +376,8 @@ extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId); extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRowGlobally(uint64 placementId); extern void DeleteShardPlacementRowGlobally(uint64 placementId);
extern void DeleteShardPlacementRow(uint64 placementId); extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam); extern void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam,
bool allowFromWorkersIfPostgresTable);
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName, extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, int shardCount, char distributionMethod, int shardCount,
bool shardCountIsStrict, char *colocateWithTableName); bool shardCountIsStrict, char *colocateWithTableName);

View File

@ -1483,15 +1483,13 @@ REVOKE CREATE ON DATABASE regression FROM test_non_super_user;
REVOKE CREATE ON SCHEMA public FROM test_non_super_user; REVOKE CREATE ON SCHEMA public FROM test_non_super_user;
DROP ROLE test_non_super_user; DROP ROLE test_non_super_user;
\c - - - :worker_1_port \c - - - :worker_1_port
-- test creating a tenant table from workers
CREATE TABLE tenant_3.tbl_1(a int, b text);
ERROR: cannot create tables in a distributed schema from a worker node
HINT: Connect to the coordinator node and try again.
-- test creating a tenant schema from workers -- test creating a tenant schema from workers
SET citus.enable_schema_based_sharding TO ON; SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA worker_tenant_schema; CREATE SCHEMA worker_tenant_schema;
DROP SCHEMA worker_tenant_schema; -- test creating a tenant table from workers
SET citus.enable_schema_based_sharding TO OFF; SET citus.shard_replication_factor TO 1;
CREATE TABLE worker_tenant_schema.tbl_1(a int, b text);
RESET citus.shard_replication_factor;
-- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE -- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE
-- commands that we send to workers don't recursively try creating a -- commands that we send to workers don't recursively try creating a
-- tenant schema / table. -- tenant schema / table.
@ -1516,6 +1514,8 @@ SELECT pg_reload_conf();
SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3'); SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3');
ERROR: schema is expected to be already dropped because this function is only expected to be called from Citus drop hook ERROR: schema is expected to be already dropped because this function is only expected to be called from Citus drop hook
\c - - - :master_port \c - - - :master_port
SET client_min_messages TO WARNING;
DROP SCHEMA worker_tenant_schema CASCADE;
SET search_path TO regular_schema; SET search_path TO regular_schema;
SET citus.next_shard_id TO 1950000; SET citus.next_shard_id TO 1950000;
SET citus.shard_count TO 32; SET citus.shard_count TO 32;

View File

@ -1003,14 +1003,14 @@ DROP ROLE test_non_super_user;
\c - - - :worker_1_port \c - - - :worker_1_port
-- test creating a tenant table from workers
CREATE TABLE tenant_3.tbl_1(a int, b text);
-- test creating a tenant schema from workers -- test creating a tenant schema from workers
SET citus.enable_schema_based_sharding TO ON; SET citus.enable_schema_based_sharding TO ON;
CREATE SCHEMA worker_tenant_schema; CREATE SCHEMA worker_tenant_schema;
DROP SCHEMA worker_tenant_schema;
SET citus.enable_schema_based_sharding TO OFF; -- test creating a tenant table from workers
SET citus.shard_replication_factor TO 1;
CREATE TABLE worker_tenant_schema.tbl_1(a int, b text);
RESET citus.shard_replication_factor;
-- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE -- Enable the GUC on workers to make sure that the CREATE SCHEMA/ TABLE
-- commands that we send to workers don't recursively try creating a -- commands that we send to workers don't recursively try creating a
@ -1030,6 +1030,9 @@ SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace
\c - - - :master_port \c - - - :master_port
SET client_min_messages TO WARNING;
DROP SCHEMA worker_tenant_schema CASCADE;
SET search_path TO regular_schema; SET search_path TO regular_schema;
SET citus.next_shard_id TO 1950000; SET citus.next_shard_id TO 1950000;
SET citus.shard_count TO 32; SET citus.shard_count TO 32;