diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 0592cb762..f51b62535 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -1348,8 +1348,7 @@ CreateCitusTableLike(TableConversionState *con) } else if (IsCitusTableType(con->relationId, REFERENCE_TABLE)) { - CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, false, - NULL); + CreateReferenceTable(con->newRelationId); } else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE)) { diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 86133322d..101d866f0 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -106,11 +106,17 @@ static void CreateDistributedTableConcurrently(Oid relationId, char *colocateWithTableName, int shardCount, bool shardCountIsStrict); -static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName); +static char DecideDistTableReplicationModel(char distributionMethod, + char *colocateWithTableName); static List * HashSplitPointsForShardList(List *shardList); static List * HashSplitPointsForShardCount(int shardCount); static List * WorkerNodesForShardList(List *shardList); static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength); +static void CreateCitusTable(Oid relationId, char *distributionColumnName, + char distributionMethod, + int shardCount, bool shardCountIsStrict, + char *colocateWithTableName, + char replicationModel); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty); static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn, @@ -377,8 +383,8 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, EnsureForeignKeysForDistributedTableConcurrently(relationId); - char replicationModel = DecideReplicationModel(distributionMethod, - colocateWithTableName); + char replicationModel = DecideDistTableReplicationModel(distributionMethod, + colocateWithTableName); /* * we fail transaction before local table conversion if the table could not be colocated with @@ -622,8 +628,8 @@ static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod, char *distributionColumnName, char *colocateWithTableName) { - char replicationModel = DecideReplicationModel(distributionMethod, - colocateWithTableName); + char replicationModel = DecideDistTableReplicationModel(distributionMethod, + colocateWithTableName); /* * we fail transaction before local table conversion if the table could not be colocated with @@ -860,9 +866,6 @@ create_reference_table(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); - char *colocateWithTableName = NULL; - char *distributionColumnName = NULL; - EnsureCitusTableCanBeCreated(relationId); /* enable create_reference_table on an empty node */ @@ -895,8 +898,7 @@ create_reference_table(PG_FUNCTION_ARGS) errdetail("There are no active worker nodes."))); } - CreateDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE, - ShardCount, false, colocateWithTableName); + CreateReferenceTable(relationId); PG_RETURN_VOID(); } @@ -951,17 +953,61 @@ EnsureRelationExists(Oid relationId) /* - * CreateDistributedTable creates distributed table in the given configuration. + * CreateReferenceTable is a wrapper around CreateCitusTable that creates a + * distributed table. + */ +void +CreateDistributedTable(Oid relationId, char *distributionColumnName, + char distributionMethod, + int shardCount, bool shardCountIsStrict, + char *colocateWithTableName) +{ + Assert(distributionMethod != DISTRIBUTE_BY_NONE); + + char replicationModel = DecideDistTableReplicationModel(distributionMethod, + colocateWithTableName); + CreateCitusTable(relationId, distributionColumnName, + distributionMethod, shardCount, + shardCountIsStrict, colocateWithTableName, + replicationModel); +} + + +/* + * CreateReferenceTable is a wrapper around CreateCitusTable that creates a + * reference table. + */ +void +CreateReferenceTable(Oid relationId) +{ + char *distributionColumnName = NULL; + char distributionMethod = DISTRIBUTE_BY_NONE; + int shardCount = 1; + bool shardCountIsStrict = true; + char *colocateWithTableName = NULL; + char replicationModel = REPLICATION_MODEL_2PC; + CreateCitusTable(relationId, distributionColumnName, + distributionMethod, shardCount, + shardCountIsStrict, colocateWithTableName, + replicationModel); +} + + +/* + * CreateCitusTable is the internal method that creates a Citus table in + * given configuration. + * * This functions contains all necessary logic to create distributed tables. It * performs necessary checks to ensure distributing the table is safe. If it is * safe to distribute the table, this function creates distributed table metadata, * creates shards and copies local data to shards. This function also handles * partitioned tables by distributing its partitions as well. */ -void -CreateDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, int shardCount, - bool shardCountIsStrict, char *colocateWithTableName) +static void +CreateCitusTable(Oid relationId, char *distributionColumnName, + char distributionMethod, int shardCount, + bool shardCountIsStrict, char *colocateWithTableName, + char replicationModel) { /* * EnsureTableNotDistributed errors out when relation is a citus table but @@ -1022,9 +1068,6 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, PropagatePrerequisiteObjectsForDistributedTable(relationId); - char replicationModel = DecideReplicationModel(distributionMethod, - colocateWithTableName); - Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId, distributionColumnName, NoLock); @@ -1420,18 +1463,16 @@ DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag) /* - * DecideReplicationModel function decides which replication model should be - * used depending on given distribution configuration. + * DecideDistTableReplicationModel function decides which replication model should be + * used for a distributed table depending on given distribution configuration. */ static char -DecideReplicationModel(char distributionMethod, char *colocateWithTableName) +DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTableName) { - if (distributionMethod == DISTRIBUTE_BY_NONE) - { - return REPLICATION_MODEL_2PC; - } - else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && - !IsColocateWithNone(colocateWithTableName)) + Assert(distributionMethod != DISTRIBUTE_BY_NONE); + + if (!IsColocateWithDefault(colocateWithTableName) && + !IsColocateWithNone(colocateWithTableName)) { text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index ceea51678..acb4ae5da 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -325,6 +325,7 @@ extern void DeleteShardPlacementRow(uint64 placementId); extern void CreateDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, int shardCount, bool shardCountIsStrict, char *colocateWithTableName); +extern void CreateReferenceTable(Oid relationId); extern void CreateTruncateTrigger(Oid relationId); extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);