diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 101d866f0..e38395296 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -94,6 +94,28 @@ #include "utils/syscache.h" #include "utils/inval.h" + +/* common params that apply to all Citus table types */ +typedef struct +{ + char distributionMethod; + char replicationModel; +} CitusTableParams; + + +/* + * Params that only apply to distributed tables, i.e., the ones that are + * known as DISTRIBUTED_TABLE by Citus metadata. + */ +typedef struct +{ + int shardCount; + bool shardCountIsStrict; + char *colocateWithTableName; + char *distributionColumnName; +} DistributedTableParams; + + /* * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged. */ @@ -112,17 +134,16 @@ static List * HashSplitPointsForShardList(List *shardList); static List * HashSplitPointsForShardCount(int shardCount); static List * WorkerNodesForShardList(List *shardList); static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength); -static void CreateCitusTable(Oid relationId, char *distributionColumnName, - char distributionMethod, - int shardCount, bool shardCountIsStrict, - char *colocateWithTableName, - char replicationModel); +static CitusTableParams DecideCitusTableParams(CitusTableType tableType, + DistributedTableParams * + distributedTableParams); +static void CreateCitusTable(Oid relationId, CitusTableType tableType, + DistributedTableParams *distributedTableParams); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty); -static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn, - char distributionMethod, char replicationModel, - int shardCount, bool shardCountIsStrict, - char *colocateWithTableName); +static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, + DistributedTableParams *distributedTableParams, + Var *distributionColumn); static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, char distributionMethod, uint32 colocationId, char replicationModel); @@ -962,14 +983,42 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, int shardCount, bool shardCountIsStrict, char *colocateWithTableName) { - Assert(distributionMethod != DISTRIBUTE_BY_NONE); + CitusTableType tableType; + switch (distributionMethod) + { + case DISTRIBUTE_BY_HASH: + { + tableType = HASH_DISTRIBUTED; + break; + } - char replicationModel = DecideDistTableReplicationModel(distributionMethod, - colocateWithTableName); - CreateCitusTable(relationId, distributionColumnName, - distributionMethod, shardCount, - shardCountIsStrict, colocateWithTableName, - replicationModel); + case DISTRIBUTE_BY_APPEND: + { + tableType = APPEND_DISTRIBUTED; + break; + } + + case DISTRIBUTE_BY_RANGE: + { + tableType = RANGE_DISTRIBUTED; + break; + } + + default: + { + ereport(ERROR, (errmsg("unexpected distribution method when " + "deciding Citus table type"))); + break; + } + } + + DistributedTableParams distributedTableParams = { + .colocateWithTableName = colocateWithTableName, + .shardCount = shardCount, + .shardCountIsStrict = shardCountIsStrict, + .distributionColumnName = distributionColumnName + }; + CreateCitusTable(relationId, tableType, &distributedTableParams); } @@ -980,16 +1029,7 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, void CreateReferenceTable(Oid relationId) { - char *distributionColumnName = NULL; - char distributionMethod = DISTRIBUTE_BY_NONE; - int shardCount = 1; - bool shardCountIsStrict = true; - char *colocateWithTableName = NULL; - char replicationModel = REPLICATION_MODEL_2PC; - CreateCitusTable(relationId, distributionColumnName, - distributionMethod, shardCount, - shardCountIsStrict, colocateWithTableName, - replicationModel); + CreateCitusTable(relationId, REFERENCE_TABLE, NULL); } @@ -997,6 +1037,9 @@ CreateReferenceTable(Oid relationId) * CreateCitusTable is the internal method that creates a Citus table in * given configuration. * + * DistributedTableParams should be non-null only if we're creating a distributed + * table. + * * This functions contains all necessary logic to create distributed tables. It * performs necessary checks to ensure distributing the table is safe. If it is * safe to distribute the table, this function creates distributed table metadata, @@ -1004,11 +1047,17 @@ CreateReferenceTable(Oid relationId) * partitioned tables by distributing its partitions as well. */ static void -CreateCitusTable(Oid relationId, char *distributionColumnName, - char distributionMethod, int shardCount, - bool shardCountIsStrict, char *colocateWithTableName, - char replicationModel) +CreateCitusTable(Oid relationId, CitusTableType tableType, + DistributedTableParams *distributedTableParams) { + if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED || + tableType == RANGE_DISTRIBUTED) != (distributedTableParams != NULL)) + { + ereport(ERROR, (errmsg("distributed table params must be provided " + "when creating a distributed table and must " + "not be otherwise"))); + } + /* * EnsureTableNotDistributed errors out when relation is a citus table but * we don't want to ask user to first undistribute their citus local tables @@ -1034,11 +1083,8 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, * that ALTER TABLE hook does the necessary job, which means converting * local tables to citus local tables to properly support such foreign * keys. - * - * This function does not expect to create Citus local table, so we blindly - * create reference table when the method is DISTRIBUTE_BY_NONE. */ - else if (distributionMethod == DISTRIBUTE_BY_NONE && + else if (tableType == REFERENCE_TABLE && ShouldEnableLocalReferenceForeignKeys() && HasForeignKeyWithLocalTable(relationId)) { @@ -1068,21 +1114,29 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, PropagatePrerequisiteObjectsForDistributedTable(relationId); - Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId, - distributionColumnName, - NoLock); + Var *distributionColumn = NULL; + if (distributedTableParams) + { + distributionColumn = BuildDistributionKeyFromColumnName(relationId, + distributedTableParams-> + distributionColumnName, + NoLock); + } + + CitusTableParams citusTableParams = DecideCitusTableParams(tableType, + distributedTableParams); /* * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case, * our caller already acquired lock on relationId. */ - uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn, - distributionMethod, replicationModel, - shardCount, shardCountIsStrict, - colocateWithTableName); + uint32 colocationId = ColocationIdForNewTable(relationId, tableType, + distributedTableParams, + distributionColumn); - EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod, - colocationId, replicationModel); + EnsureRelationCanBeDistributed(relationId, distributionColumn, + citusTableParams.distributionMethod, + colocationId, citusTableParams.replicationModel); /* * Make sure that existing reference tables have been replicated to all the nodes @@ -1111,8 +1165,10 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, bool autoConverted = false; /* create an entry for distributed table in pg_dist_partition */ - InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, - colocationId, replicationModel, autoConverted); + InsertIntoPgDistPartition(relationId, citusTableParams.distributionMethod, + distributionColumn, + colocationId, citusTableParams.replicationModel, + autoConverted); /* foreign tables do not support TRUNCATE trigger */ if (RegularTable(relationId)) @@ -1121,17 +1177,14 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, } /* create shards for hash distributed and reference tables */ - if (distributionMethod == DISTRIBUTE_BY_HASH) + if (tableType == HASH_DISTRIBUTED) { - CreateHashDistributedTableShards(relationId, shardCount, colocatedTableId, + CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, + colocatedTableId, localTableEmpty); } - else if (distributionMethod == DISTRIBUTE_BY_NONE) + else if (tableType == REFERENCE_TABLE) { - /* - * This function does not expect to create Citus local table, so we blindly - * create reference table when the method is DISTRIBUTE_BY_NONE. - */ CreateReferenceTableShard(relationId); } @@ -1173,9 +1226,14 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, { MemoryContextReset(citusPartitionContext); - CreateDistributedTable(partitionRelationId, distributionColumnName, - distributionMethod, shardCount, false, - parentRelationName); + DistributedTableParams childDistributedTableParams = { + .colocateWithTableName = parentRelationName, + .shardCount = distributedTableParams->shardCount, + .shardCountIsStrict = false, + .distributionColumnName = distributedTableParams->distributionColumnName, + }; + CreateCitusTable(partitionRelationId, tableType, + &childDistributedTableParams); } MemoryContextSwitchTo(oldContext); @@ -1183,8 +1241,7 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, } /* copy over data for hash distributed and reference tables */ - if (distributionMethod == DISTRIBUTE_BY_HASH || - distributionMethod == DISTRIBUTE_BY_NONE) + if (tableType == HASH_DISTRIBUTED || tableType == REFERENCE_TABLE) { if (RegularTable(relationId)) { @@ -1203,6 +1260,70 @@ CreateCitusTable(Oid relationId, char *distributionColumnName, } +/* + * DecideCitusTableParams decides CitusTableParams based on given CitusTableType + * and DistributedTableParams if it's a distributed table. + * + * DistributedTableParams should be non-null only if CitusTableType corresponds + * to a distributed table. + */ +static +CitusTableParams +DecideCitusTableParams(CitusTableType tableType, + DistributedTableParams *distributedTableParams) +{ + CitusTableParams citusTableParams = { 0 }; + switch (tableType) + { + case HASH_DISTRIBUTED: + { + citusTableParams.distributionMethod = DISTRIBUTE_BY_HASH; + citusTableParams.replicationModel = + DecideDistTableReplicationModel(DISTRIBUTE_BY_HASH, + distributedTableParams-> + colocateWithTableName); + break; + } + + case APPEND_DISTRIBUTED: + { + citusTableParams.distributionMethod = DISTRIBUTE_BY_APPEND; + citusTableParams.replicationModel = + DecideDistTableReplicationModel(APPEND_DISTRIBUTED, + distributedTableParams-> + colocateWithTableName); + break; + } + + case RANGE_DISTRIBUTED: + { + citusTableParams.distributionMethod = DISTRIBUTE_BY_RANGE; + citusTableParams.replicationModel = + DecideDistTableReplicationModel(RANGE_DISTRIBUTED, + distributedTableParams-> + colocateWithTableName); + break; + } + + case REFERENCE_TABLE: + { + citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE; + citusTableParams.replicationModel = REPLICATION_MODEL_2PC; + break; + } + + default: + { + ereport(ERROR, (errmsg("unexpected table type when deciding Citus " + "table params"))); + break; + } + } + + return citusTableParams; +} + + /* * PropagatePrerequisiteObjectsForDistributedTable ensures we can create shards * on all nodes by ensuring all dependent objects exist on all node. @@ -1547,28 +1668,34 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, /* - * ColocationIdForNewTable returns a colocation id for hash-distributed table + * ColocationIdForNewTable returns a colocation id for given table * according to given configuration. If there is no such configuration, it * creates one and returns colocation id of newly the created colocation group. + * Note that DistributedTableParams and the distribution column Var should be + * non-null only if CitusTableType corresponds to a distributed table. + * * For append and range distributed tables, this function errors out if * colocateWithTableName parameter is not NULL, otherwise directly returns * INVALID_COLOCATION_ID. * + * For reference tables, returns the common reference table colocation id. + * * This function assumes its caller take necessary lock on relationId to * prevent possible changes on it. */ static uint32 -ColocationIdForNewTable(Oid relationId, Var *distributionColumn, - char distributionMethod, char replicationModel, - int shardCount, bool shardCountIsStrict, - char *colocateWithTableName) +ColocationIdForNewTable(Oid relationId, CitusTableType tableType, + DistributedTableParams *distributedTableParams, + Var *distributionColumn) { + CitusTableParams citusTableParams = DecideCitusTableParams(tableType, + distributedTableParams); + uint32 colocationId = INVALID_COLOCATION_ID; - if (distributionMethod == DISTRIBUTE_BY_APPEND || - distributionMethod == DISTRIBUTE_BY_RANGE) + if (tableType == APPEND_DISTRIBUTED || tableType == RANGE_DISTRIBUTED) { - if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) + if (!IsColocateWithDefault(distributedTableParams->colocateWithTableName)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation"), @@ -1578,7 +1705,7 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, return colocationId; } - else if (distributionMethod == DISTRIBUTE_BY_NONE) + else if (tableType == REFERENCE_TABLE) { return CreateReferenceTableColocationId(); } @@ -1589,27 +1716,29 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, * can be sure that there will no modifications on the colocation table * until this transaction is committed. */ - Assert(distributionMethod == DISTRIBUTE_BY_HASH); + Assert(citusTableParams.distributionMethod == DISTRIBUTE_BY_HASH); Oid distributionColumnType = distributionColumn->vartype; Oid distributionColumnCollation = get_typcollation(distributionColumnType); /* get an advisory lock to serialize concurrent default group creations */ - if (IsColocateWithDefault(colocateWithTableName)) + if (IsColocateWithDefault(distributedTableParams->colocateWithTableName)) { AcquireColocationDefaultLock(); } colocationId = FindColocateWithColocationId(relationId, - replicationModel, + citusTableParams.replicationModel, distributionColumnType, distributionColumnCollation, - shardCount, + distributedTableParams->shardCount, + distributedTableParams-> shardCountIsStrict, + distributedTableParams-> colocateWithTableName); - if (IsColocateWithDefault(colocateWithTableName) && (colocationId != - INVALID_COLOCATION_ID)) + if (IsColocateWithDefault(distributedTableParams->colocateWithTableName) && + (colocationId != INVALID_COLOCATION_ID)) { /* * we can release advisory lock if there is already a default entry for given params; @@ -1621,23 +1750,25 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, if (colocationId == INVALID_COLOCATION_ID) { - if (IsColocateWithDefault(colocateWithTableName)) + if (IsColocateWithDefault(distributedTableParams->colocateWithTableName)) { /* * Generate a new colocation ID and insert a pg_dist_colocation * record. */ - colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor, + colocationId = CreateColocationGroup(distributedTableParams->shardCount, + ShardReplicationFactor, distributionColumnType, distributionColumnCollation); } - else if (IsColocateWithNone(colocateWithTableName)) + else if (IsColocateWithNone(distributedTableParams->colocateWithTableName)) { /* * Generate a new colocation ID and insert a pg_dist_colocation * record. */ - colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor, + colocationId = CreateColocationGroup(distributedTableParams->shardCount, + ShardReplicationFactor, distributionColumnType, distributionColumnCollation); }