mirror of https://github.com/citusdata/citus.git
Refactor CreateDistributedTable() (#6742)
Split the main logic that allows creating a Citus table into the internal function CreateCitusTable(). Old CreateDistributedTable() function was assuming that it's creating a reference table when the distribution method is DISTRIBUTE_BY_NONE. However, soon this won't be the case when adding support for creating single-shard distributed tables because their distribution method would also be the same. Now the internal method CreateCitusTable() doesn't make any assumptions about table's replication model or such. Instead, it expects callers to properly set all such metadata bits. Even more, some of the parameters the old CreateDistributedTable() takes --such as the shard count-- were not meaningful for a reference table, and would be the same as for new table type.pull/6751/head^2
parent
4043abd5aa
commit
d82c11f793
|
@ -1348,8 +1348,7 @@ CreateCitusTableLike(TableConversionState *con)
|
||||||
}
|
}
|
||||||
else if (IsCitusTableType(con->relationId, REFERENCE_TABLE))
|
else if (IsCitusTableType(con->relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, false,
|
CreateReferenceTable(con->newRelationId);
|
||||||
NULL);
|
|
||||||
}
|
}
|
||||||
else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE))
|
else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE))
|
||||||
{
|
{
|
||||||
|
|
|
@ -106,11 +106,17 @@ static void CreateDistributedTableConcurrently(Oid relationId,
|
||||||
char *colocateWithTableName,
|
char *colocateWithTableName,
|
||||||
int shardCount,
|
int shardCount,
|
||||||
bool shardCountIsStrict);
|
bool shardCountIsStrict);
|
||||||
static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName);
|
static char DecideDistTableReplicationModel(char distributionMethod,
|
||||||
|
char *colocateWithTableName);
|
||||||
static List * HashSplitPointsForShardList(List *shardList);
|
static List * HashSplitPointsForShardList(List *shardList);
|
||||||
static List * HashSplitPointsForShardCount(int shardCount);
|
static List * HashSplitPointsForShardCount(int shardCount);
|
||||||
static List * WorkerNodesForShardList(List *shardList);
|
static List * WorkerNodesForShardList(List *shardList);
|
||||||
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
|
static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength);
|
||||||
|
static void CreateCitusTable(Oid relationId, char *distributionColumnName,
|
||||||
|
char distributionMethod,
|
||||||
|
int shardCount, bool shardCountIsStrict,
|
||||||
|
char *colocateWithTableName,
|
||||||
|
char replicationModel);
|
||||||
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
|
||||||
Oid colocatedTableId, bool localTableEmpty);
|
Oid colocatedTableId, bool localTableEmpty);
|
||||||
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
|
||||||
|
@ -377,7 +383,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
|
||||||
|
|
||||||
EnsureForeignKeysForDistributedTableConcurrently(relationId);
|
EnsureForeignKeysForDistributedTableConcurrently(relationId);
|
||||||
|
|
||||||
char replicationModel = DecideReplicationModel(distributionMethod,
|
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
|
||||||
colocateWithTableName);
|
colocateWithTableName);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -622,7 +628,7 @@ static void
|
||||||
EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
|
EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
|
||||||
char *distributionColumnName, char *colocateWithTableName)
|
char *distributionColumnName, char *colocateWithTableName)
|
||||||
{
|
{
|
||||||
char replicationModel = DecideReplicationModel(distributionMethod,
|
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
|
||||||
colocateWithTableName);
|
colocateWithTableName);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -860,9 +866,6 @@ create_reference_table(PG_FUNCTION_ARGS)
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
|
|
||||||
char *colocateWithTableName = NULL;
|
|
||||||
char *distributionColumnName = NULL;
|
|
||||||
|
|
||||||
EnsureCitusTableCanBeCreated(relationId);
|
EnsureCitusTableCanBeCreated(relationId);
|
||||||
|
|
||||||
/* enable create_reference_table on an empty node */
|
/* enable create_reference_table on an empty node */
|
||||||
|
@ -895,8 +898,7 @@ create_reference_table(PG_FUNCTION_ARGS)
|
||||||
errdetail("There are no active worker nodes.")));
|
errdetail("There are no active worker nodes.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
CreateDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE,
|
CreateReferenceTable(relationId);
|
||||||
ShardCount, false, colocateWithTableName);
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -951,17 +953,61 @@ EnsureRelationExists(Oid relationId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateDistributedTable creates distributed table in the given configuration.
|
* CreateReferenceTable is a wrapper around CreateCitusTable that creates a
|
||||||
|
* distributed table.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
|
char distributionMethod,
|
||||||
|
int shardCount, bool shardCountIsStrict,
|
||||||
|
char *colocateWithTableName)
|
||||||
|
{
|
||||||
|
Assert(distributionMethod != DISTRIBUTE_BY_NONE);
|
||||||
|
|
||||||
|
char replicationModel = DecideDistTableReplicationModel(distributionMethod,
|
||||||
|
colocateWithTableName);
|
||||||
|
CreateCitusTable(relationId, distributionColumnName,
|
||||||
|
distributionMethod, shardCount,
|
||||||
|
shardCountIsStrict, colocateWithTableName,
|
||||||
|
replicationModel);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateReferenceTable is a wrapper around CreateCitusTable that creates a
|
||||||
|
* reference table.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CreateReferenceTable(Oid relationId)
|
||||||
|
{
|
||||||
|
char *distributionColumnName = NULL;
|
||||||
|
char distributionMethod = DISTRIBUTE_BY_NONE;
|
||||||
|
int shardCount = 1;
|
||||||
|
bool shardCountIsStrict = true;
|
||||||
|
char *colocateWithTableName = NULL;
|
||||||
|
char replicationModel = REPLICATION_MODEL_2PC;
|
||||||
|
CreateCitusTable(relationId, distributionColumnName,
|
||||||
|
distributionMethod, shardCount,
|
||||||
|
shardCountIsStrict, colocateWithTableName,
|
||||||
|
replicationModel);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateCitusTable is the internal method that creates a Citus table in
|
||||||
|
* given configuration.
|
||||||
|
*
|
||||||
* This functions contains all necessary logic to create distributed tables. It
|
* This functions contains all necessary logic to create distributed tables. It
|
||||||
* performs necessary checks to ensure distributing the table is safe. If it is
|
* performs necessary checks to ensure distributing the table is safe. If it is
|
||||||
* safe to distribute the table, this function creates distributed table metadata,
|
* safe to distribute the table, this function creates distributed table metadata,
|
||||||
* creates shards and copies local data to shards. This function also handles
|
* creates shards and copies local data to shards. This function also handles
|
||||||
* partitioned tables by distributing its partitions as well.
|
* partitioned tables by distributing its partitions as well.
|
||||||
*/
|
*/
|
||||||
void
|
static void
|
||||||
CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
CreateCitusTable(Oid relationId, char *distributionColumnName,
|
||||||
char distributionMethod, int shardCount,
|
char distributionMethod, int shardCount,
|
||||||
bool shardCountIsStrict, char *colocateWithTableName)
|
bool shardCountIsStrict, char *colocateWithTableName,
|
||||||
|
char replicationModel)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* EnsureTableNotDistributed errors out when relation is a citus table but
|
* EnsureTableNotDistributed errors out when relation is a citus table but
|
||||||
|
@ -1022,9 +1068,6 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
|
|
||||||
PropagatePrerequisiteObjectsForDistributedTable(relationId);
|
PropagatePrerequisiteObjectsForDistributedTable(relationId);
|
||||||
|
|
||||||
char replicationModel = DecideReplicationModel(distributionMethod,
|
|
||||||
colocateWithTableName);
|
|
||||||
|
|
||||||
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
|
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
|
||||||
distributionColumnName,
|
distributionColumnName,
|
||||||
NoLock);
|
NoLock);
|
||||||
|
@ -1420,17 +1463,15 @@ DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DecideReplicationModel function decides which replication model should be
|
* DecideDistTableReplicationModel function decides which replication model should be
|
||||||
* used depending on given distribution configuration.
|
* used for a distributed table depending on given distribution configuration.
|
||||||
*/
|
*/
|
||||||
static char
|
static char
|
||||||
DecideReplicationModel(char distributionMethod, char *colocateWithTableName)
|
DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTableName)
|
||||||
{
|
{
|
||||||
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
Assert(distributionMethod != DISTRIBUTE_BY_NONE);
|
||||||
{
|
|
||||||
return REPLICATION_MODEL_2PC;
|
if (!IsColocateWithDefault(colocateWithTableName) &&
|
||||||
}
|
|
||||||
else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
|
|
||||||
!IsColocateWithNone(colocateWithTableName))
|
!IsColocateWithNone(colocateWithTableName))
|
||||||
{
|
{
|
||||||
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
|
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
|
||||||
|
|
|
@ -325,6 +325,7 @@ extern void DeleteShardPlacementRow(uint64 placementId);
|
||||||
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
char distributionMethod, int shardCount,
|
char distributionMethod, int shardCount,
|
||||||
bool shardCountIsStrict, char *colocateWithTableName);
|
bool shardCountIsStrict, char *colocateWithTableName);
|
||||||
|
extern void CreateReferenceTable(Oid relationId);
|
||||||
extern void CreateTruncateTrigger(Oid relationId);
|
extern void CreateTruncateTrigger(Oid relationId);
|
||||||
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
|
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue