mirror of https://github.com/citusdata/citus.git
Decide core distribution params in CreateCitusTable (#6760)
Decide core distribution params in CreateCitusTable to reduce the chances of creating Citus tables based on incorrect combinations of distribution method and replication model params. Also introduce DistributedTableParams struct to encapsulate the parameters that are specific to distributed tables.pull/6637/merge
parent
cc945fa331
commit
f68fc9e69c
|
@ -94,6 +94,28 @@
|
|||
#include "utils/syscache.h"
|
||||
#include "utils/inval.h"
|
||||
|
||||
|
||||
/* common params that apply to all Citus table types */
|
||||
typedef struct
|
||||
{
|
||||
char distributionMethod;
|
||||
char replicationModel;
|
||||
} CitusTableParams;
|
||||
|
||||
|
||||
/*
|
||||
* Params that only apply to distributed tables, i.e., the ones that are
|
||||
* known as DISTRIBUTED_TABLE by Citus metadata.
|
||||
*/
|
||||
typedef struct
|
||||
{
|
||||
int shardCount;
|
||||
bool shardCountIsStrict;
|
||||
char *colocateWithTableName;
|
||||
char *distributionColumnName;
|
||||
} DistributedTableParams;
|
||||
|
||||
|
||||
/*
|
||||
* once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
|
||||
*/
|
||||
|
@ -112,17 +134,16 @@ static List * HashSplitPointsForShardList(List *shardList);
|
|||
static List * HashSplitPointsForShardCount(int shardCount);
|
||||
static List * WorkerNodesForShardList(List *shardList);
|
||||
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
|
||||
static void CreateCitusTable(Oid relationId, char *distributionColumnName,
|
||||
char distributionMethod,
|
||||
int shardCount, bool shardCountIsStrict,
|
||||
char *colocateWithTableName,
|
||||
char replicationModel);
|
||||
static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
|
||||
DistributedTableParams *
|
||||
distributedTableParams);
|
||||
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
|
||||
DistributedTableParams *distributedTableParams);
|
||||
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
||||
Oid colocatedTableId, bool localTableEmpty);
|
||||
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
||||
char distributionMethod, char replicationModel,
|
||||
int shardCount, bool shardCountIsStrict,
|
||||
char *colocateWithTableName);
|
||||
static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
|
||||
DistributedTableParams *distributedTableParams,
|
||||
Var *distributionColumn);
|
||||
static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
||||
char distributionMethod, uint32 colocationId,
|
||||
char replicationModel);
|
||||
|
@ -962,14 +983,42 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
int shardCount, bool shardCountIsStrict,
|
||||
char *colocateWithTableName)
|
||||
{
|
||||
Assert(distributionMethod != DISTRIBUTE_BY_NONE);
|
||||
CitusTableType tableType;
|
||||
switch (distributionMethod)
|
||||
{
|
||||
case DISTRIBUTE_BY_HASH:
|
||||
{
|
||||
tableType = HASH_DISTRIBUTED;
|
||||
break;
|
||||
}
|
||||
|
||||
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
|
||||
colocateWithTableName);
|
||||
CreateCitusTable(relationId, distributionColumnName,
|
||||
distributionMethod, shardCount,
|
||||
shardCountIsStrict, colocateWithTableName,
|
||||
replicationModel);
|
||||
case DISTRIBUTE_BY_APPEND:
|
||||
{
|
||||
tableType = APPEND_DISTRIBUTED;
|
||||
break;
|
||||
}
|
||||
|
||||
case DISTRIBUTE_BY_RANGE:
|
||||
{
|
||||
tableType = RANGE_DISTRIBUTED;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("unexpected distribution method when "
|
||||
"deciding Citus table type")));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
DistributedTableParams distributedTableParams = {
|
||||
.colocateWithTableName = colocateWithTableName,
|
||||
.shardCount = shardCount,
|
||||
.shardCountIsStrict = shardCountIsStrict,
|
||||
.distributionColumnName = distributionColumnName
|
||||
};
|
||||
CreateCitusTable(relationId, tableType, &distributedTableParams);
|
||||
}
|
||||
|
||||
|
||||
|
@ -980,16 +1029,7 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
void
|
||||
CreateReferenceTable(Oid relationId)
|
||||
{
|
||||
char *distributionColumnName = NULL;
|
||||
char distributionMethod = DISTRIBUTE_BY_NONE;
|
||||
int shardCount = 1;
|
||||
bool shardCountIsStrict = true;
|
||||
char *colocateWithTableName = NULL;
|
||||
char replicationModel = REPLICATION_MODEL_2PC;
|
||||
CreateCitusTable(relationId, distributionColumnName,
|
||||
distributionMethod, shardCount,
|
||||
shardCountIsStrict, colocateWithTableName,
|
||||
replicationModel);
|
||||
CreateCitusTable(relationId, REFERENCE_TABLE, NULL);
|
||||
}
|
||||
|
||||
|
||||
|
@ -997,6 +1037,9 @@ CreateReferenceTable(Oid relationId)
|
|||
* CreateCitusTable is the internal method that creates a Citus table in
|
||||
* given configuration.
|
||||
*
|
||||
* DistributedTableParams should be non-null only if we're creating a distributed
|
||||
* table.
|
||||
*
|
||||
* This functions contains all necessary logic to create distributed tables. It
|
||||
* performs necessary checks to ensure distributing the table is safe. If it is
|
||||
* safe to distribute the table, this function creates distributed table metadata,
|
||||
|
@ -1004,11 +1047,17 @@ CreateReferenceTable(Oid relationId)
|
|||
* partitioned tables by distributing its partitions as well.
|
||||
*/
|
||||
static void
|
||||
CreateCitusTable(Oid relationId, char *distributionColumnName,
|
||||
char distributionMethod, int shardCount,
|
||||
bool shardCountIsStrict, char *colocateWithTableName,
|
||||
char replicationModel)
|
||||
CreateCitusTable(Oid relationId, CitusTableType tableType,
|
||||
DistributedTableParams *distributedTableParams)
|
||||
{
|
||||
if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED ||
|
||||
tableType == RANGE_DISTRIBUTED) != (distributedTableParams != NULL))
|
||||
{
|
||||
ereport(ERROR, (errmsg("distributed table params must be provided "
|
||||
"when creating a distributed table and must "
|
||||
"not be otherwise")));
|
||||
}
|
||||
|
||||
/*
|
||||
* EnsureTableNotDistributed errors out when relation is a citus table but
|
||||
* we don't want to ask user to first undistribute their citus local tables
|
||||
|
@ -1034,11 +1083,8 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
|
|||
* that ALTER TABLE hook does the necessary job, which means converting
|
||||
* local tables to citus local tables to properly support such foreign
|
||||
* keys.
|
||||
*
|
||||
* This function does not expect to create Citus local table, so we blindly
|
||||
* create reference table when the method is DISTRIBUTE_BY_NONE.
|
||||
*/
|
||||
else if (distributionMethod == DISTRIBUTE_BY_NONE &&
|
||||
else if (tableType == REFERENCE_TABLE &&
|
||||
ShouldEnableLocalReferenceForeignKeys() &&
|
||||
HasForeignKeyWithLocalTable(relationId))
|
||||
{
|
||||
|
@ -1068,21 +1114,29 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
|
|||
|
||||
PropagatePrerequisiteObjectsForDistributedTable(relationId);
|
||||
|
||||
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
|
||||
distributionColumnName,
|
||||
NoLock);
|
||||
Var *distributionColumn = NULL;
|
||||
if (distributedTableParams)
|
||||
{
|
||||
distributionColumn = BuildDistributionKeyFromColumnName(relationId,
|
||||
distributedTableParams->
|
||||
distributionColumnName,
|
||||
NoLock);
|
||||
}
|
||||
|
||||
CitusTableParams citusTableParams = DecideCitusTableParams(tableType,
|
||||
distributedTableParams);
|
||||
|
||||
/*
|
||||
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
|
||||
* our caller already acquired lock on relationId.
|
||||
*/
|
||||
uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn,
|
||||
distributionMethod, replicationModel,
|
||||
shardCount, shardCountIsStrict,
|
||||
colocateWithTableName);
|
||||
uint32 colocationId = ColocationIdForNewTable(relationId, tableType,
|
||||
distributedTableParams,
|
||||
distributionColumn);
|
||||
|
||||
EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
|
||||
colocationId, replicationModel);
|
||||
EnsureRelationCanBeDistributed(relationId, distributionColumn,
|
||||
citusTableParams.distributionMethod,
|
||||
colocationId, citusTableParams.replicationModel);
|
||||
|
||||
/*
|
||||
* Make sure that existing reference tables have been replicated to all the nodes
|
||||
|
@ -1111,8 +1165,10 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
|
|||
bool autoConverted = false;
|
||||
|
||||
/* create an entry for distributed table in pg_dist_partition */
|
||||
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
|
||||
colocationId, replicationModel, autoConverted);
|
||||
InsertIntoPgDistPartition(relationId, citusTableParams.distributionMethod,
|
||||
distributionColumn,
|
||||
colocationId, citusTableParams.replicationModel,
|
||||
autoConverted);
|
||||
|
||||
/* foreign tables do not support TRUNCATE trigger */
|
||||
if (RegularTable(relationId))
|
||||
|
@ -1121,17 +1177,14 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
|
|||
}
|
||||
|
||||
/* create shards for hash distributed and reference tables */
|
||||
if (distributionMethod == DISTRIBUTE_BY_HASH)
|
||||
if (tableType == HASH_DISTRIBUTED)
|
||||
{
|
||||
CreateHashDistributedTableShards(relationId, shardCount, colocatedTableId,
|
||||
CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount,
|
||||
colocatedTableId,
|
||||
localTableEmpty);
|
||||
}
|
||||
else if (distributionMethod == DISTRIBUTE_BY_NONE)
|
||||
else if (tableType == REFERENCE_TABLE)
|
||||
{
|
||||
/*
|
||||
* This function does not expect to create Citus local table, so we blindly
|
||||
* create reference table when the method is DISTRIBUTE_BY_NONE.
|
||||
*/
|
||||
CreateReferenceTableShard(relationId);
|
||||
}
|
||||
|
||||
|
@ -1173,9 +1226,14 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
|
|||
{
|
||||
MemoryContextReset(citusPartitionContext);
|
||||
|
||||
CreateDistributedTable(partitionRelationId, distributionColumnName,
|
||||
distributionMethod, shardCount, false,
|
||||
parentRelationName);
|
||||
DistributedTableParams childDistributedTableParams = {
|
||||
.colocateWithTableName = parentRelationName,
|
||||
.shardCount = distributedTableParams->shardCount,
|
||||
.shardCountIsStrict = false,
|
||||
.distributionColumnName = distributedTableParams->distributionColumnName,
|
||||
};
|
||||
CreateCitusTable(partitionRelationId, tableType,
|
||||
&childDistributedTableParams);
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
@ -1183,8 +1241,7 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
|
|||
}
|
||||
|
||||
/* copy over data for hash distributed and reference tables */
|
||||
if (distributionMethod == DISTRIBUTE_BY_HASH ||
|
||||
distributionMethod == DISTRIBUTE_BY_NONE)
|
||||
if (tableType == HASH_DISTRIBUTED || tableType == REFERENCE_TABLE)
|
||||
{
|
||||
if (RegularTable(relationId))
|
||||
{
|
||||
|
@ -1203,6 +1260,70 @@ CreateCitusTable(Oid relationId, char *distributionColumnName,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* DecideCitusTableParams decides CitusTableParams based on given CitusTableType
|
||||
* and DistributedTableParams if it's a distributed table.
|
||||
*
|
||||
* DistributedTableParams should be non-null only if CitusTableType corresponds
|
||||
* to a distributed table.
|
||||
*/
|
||||
static
|
||||
CitusTableParams
|
||||
DecideCitusTableParams(CitusTableType tableType,
|
||||
DistributedTableParams *distributedTableParams)
|
||||
{
|
||||
CitusTableParams citusTableParams = { 0 };
|
||||
switch (tableType)
|
||||
{
|
||||
case HASH_DISTRIBUTED:
|
||||
{
|
||||
citusTableParams.distributionMethod = DISTRIBUTE_BY_HASH;
|
||||
citusTableParams.replicationModel =
|
||||
DecideDistTableReplicationModel(DISTRIBUTE_BY_HASH,
|
||||
distributedTableParams->
|
||||
colocateWithTableName);
|
||||
break;
|
||||
}
|
||||
|
||||
case APPEND_DISTRIBUTED:
|
||||
{
|
||||
citusTableParams.distributionMethod = DISTRIBUTE_BY_APPEND;
|
||||
citusTableParams.replicationModel =
|
||||
DecideDistTableReplicationModel(APPEND_DISTRIBUTED,
|
||||
distributedTableParams->
|
||||
colocateWithTableName);
|
||||
break;
|
||||
}
|
||||
|
||||
case RANGE_DISTRIBUTED:
|
||||
{
|
||||
citusTableParams.distributionMethod = DISTRIBUTE_BY_RANGE;
|
||||
citusTableParams.replicationModel =
|
||||
DecideDistTableReplicationModel(RANGE_DISTRIBUTED,
|
||||
distributedTableParams->
|
||||
colocateWithTableName);
|
||||
break;
|
||||
}
|
||||
|
||||
case REFERENCE_TABLE:
|
||||
{
|
||||
citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE;
|
||||
citusTableParams.replicationModel = REPLICATION_MODEL_2PC;
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("unexpected table type when deciding Citus "
|
||||
"table params")));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return citusTableParams;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PropagatePrerequisiteObjectsForDistributedTable ensures we can create shards
|
||||
* on all nodes by ensuring all dependent objects exist on all node.
|
||||
|
@ -1547,28 +1668,34 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
|||
|
||||
|
||||
/*
|
||||
* ColocationIdForNewTable returns a colocation id for hash-distributed table
|
||||
* ColocationIdForNewTable returns a colocation id for given table
|
||||
* according to given configuration. If there is no such configuration, it
|
||||
* creates one and returns colocation id of newly the created colocation group.
|
||||
* Note that DistributedTableParams and the distribution column Var should be
|
||||
* non-null only if CitusTableType corresponds to a distributed table.
|
||||
*
|
||||
* For append and range distributed tables, this function errors out if
|
||||
* colocateWithTableName parameter is not NULL, otherwise directly returns
|
||||
* INVALID_COLOCATION_ID.
|
||||
*
|
||||
* For reference tables, returns the common reference table colocation id.
|
||||
*
|
||||
* This function assumes its caller take necessary lock on relationId to
|
||||
* prevent possible changes on it.
|
||||
*/
|
||||
static uint32
|
||||
ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
||||
char distributionMethod, char replicationModel,
|
||||
int shardCount, bool shardCountIsStrict,
|
||||
char *colocateWithTableName)
|
||||
ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
|
||||
DistributedTableParams *distributedTableParams,
|
||||
Var *distributionColumn)
|
||||
{
|
||||
CitusTableParams citusTableParams = DecideCitusTableParams(tableType,
|
||||
distributedTableParams);
|
||||
|
||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||
|
||||
if (distributionMethod == DISTRIBUTE_BY_APPEND ||
|
||||
distributionMethod == DISTRIBUTE_BY_RANGE)
|
||||
if (tableType == APPEND_DISTRIBUTED || tableType == RANGE_DISTRIBUTED)
|
||||
{
|
||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
|
||||
if (!IsColocateWithDefault(distributedTableParams->colocateWithTableName))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot distribute relation"),
|
||||
|
@ -1578,7 +1705,7 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
|||
|
||||
return colocationId;
|
||||
}
|
||||
else if (distributionMethod == DISTRIBUTE_BY_NONE)
|
||||
else if (tableType == REFERENCE_TABLE)
|
||||
{
|
||||
return CreateReferenceTableColocationId();
|
||||
}
|
||||
|
@ -1589,27 +1716,29 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
|||
* can be sure that there will no modifications on the colocation table
|
||||
* until this transaction is committed.
|
||||
*/
|
||||
Assert(distributionMethod == DISTRIBUTE_BY_HASH);
|
||||
Assert(citusTableParams.distributionMethod == DISTRIBUTE_BY_HASH);
|
||||
|
||||
Oid distributionColumnType = distributionColumn->vartype;
|
||||
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
|
||||
|
||||
/* get an advisory lock to serialize concurrent default group creations */
|
||||
if (IsColocateWithDefault(colocateWithTableName))
|
||||
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
|
||||
{
|
||||
AcquireColocationDefaultLock();
|
||||
}
|
||||
|
||||
colocationId = FindColocateWithColocationId(relationId,
|
||||
replicationModel,
|
||||
citusTableParams.replicationModel,
|
||||
distributionColumnType,
|
||||
distributionColumnCollation,
|
||||
shardCount,
|
||||
distributedTableParams->shardCount,
|
||||
distributedTableParams->
|
||||
shardCountIsStrict,
|
||||
distributedTableParams->
|
||||
colocateWithTableName);
|
||||
|
||||
if (IsColocateWithDefault(colocateWithTableName) && (colocationId !=
|
||||
INVALID_COLOCATION_ID))
|
||||
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName) &&
|
||||
(colocationId != INVALID_COLOCATION_ID))
|
||||
{
|
||||
/*
|
||||
* we can release advisory lock if there is already a default entry for given params;
|
||||
|
@ -1621,23 +1750,25 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
|||
|
||||
if (colocationId == INVALID_COLOCATION_ID)
|
||||
{
|
||||
if (IsColocateWithDefault(colocateWithTableName))
|
||||
if (IsColocateWithDefault(distributedTableParams->colocateWithTableName))
|
||||
{
|
||||
/*
|
||||
* Generate a new colocation ID and insert a pg_dist_colocation
|
||||
* record.
|
||||
*/
|
||||
colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,
|
||||
colocationId = CreateColocationGroup(distributedTableParams->shardCount,
|
||||
ShardReplicationFactor,
|
||||
distributionColumnType,
|
||||
distributionColumnCollation);
|
||||
}
|
||||
else if (IsColocateWithNone(colocateWithTableName))
|
||||
else if (IsColocateWithNone(distributedTableParams->colocateWithTableName))
|
||||
{
|
||||
/*
|
||||
* Generate a new colocation ID and insert a pg_dist_colocation
|
||||
* record.
|
||||
*/
|
||||
colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,
|
||||
colocationId = CreateColocationGroup(distributedTableParams->shardCount,
|
||||
ShardReplicationFactor,
|
||||
distributionColumnType,
|
||||
distributionColumnCollation);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue