mirror of https://github.com/citusdata/citus.git
Refactor distribution column type check for colocation
parent
d43a01ebae
commit
fc908a3ab6
|
@ -926,24 +926,11 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Var *colocationTablePartitionColumn = NULL;
|
|
||||||
Oid colocationTablePartitionColumnType = InvalidOid;
|
|
||||||
|
|
||||||
/* get colocation group of the target table */
|
/* get colocation group of the target table */
|
||||||
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
|
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
|
||||||
sourceRelationId = ResolveRelationId(colocateWithTableNameText);
|
sourceRelationId = ResolveRelationId(colocateWithTableNameText);
|
||||||
|
|
||||||
colocationId = TableColocationId(sourceRelationId);
|
colocationId = TableColocationId(sourceRelationId);
|
||||||
|
|
||||||
colocationTablePartitionColumn = PartitionKey(sourceRelationId);
|
|
||||||
colocationTablePartitionColumnType = colocationTablePartitionColumn->vartype;
|
|
||||||
|
|
||||||
if (colocationTablePartitionColumnType != distributionColumnType)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
||||||
errmsg("cannot colocate with %s", colocateWithTableName),
|
|
||||||
errdetail("Distribution column types are different.")));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* create distributed table metadata */
|
/* create distributed table metadata */
|
||||||
|
@ -953,7 +940,9 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
/* create shards */
|
/* create shards */
|
||||||
if (sourceRelationId != InvalidOid)
|
if (sourceRelationId != InvalidOid)
|
||||||
{
|
{
|
||||||
|
/* first run checks */
|
||||||
CheckReplicationModel(sourceRelationId, relationId);
|
CheckReplicationModel(sourceRelationId, relationId);
|
||||||
|
CheckDistributionColumnType(sourceRelationId, relationId);
|
||||||
|
|
||||||
CreateColocatedShards(relationId, sourceRelationId);
|
CreateColocatedShards(relationId, sourceRelationId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,8 +76,6 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]);
|
Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]);
|
||||||
|
|
||||||
CheckReplicationModel(sourceRelationId, nextRelationOid);
|
|
||||||
|
|
||||||
MarkTablesColocated(sourceRelationId, nextRelationOid);
|
MarkTablesColocated(sourceRelationId, nextRelationOid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,31 +95,12 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
||||||
uint32 sourceColocationId = INVALID_COLOCATION_ID;
|
uint32 sourceColocationId = INVALID_COLOCATION_ID;
|
||||||
uint32 targetColocationId = INVALID_COLOCATION_ID;
|
uint32 targetColocationId = INVALID_COLOCATION_ID;
|
||||||
Relation pgDistColocation = NULL;
|
Relation pgDistColocation = NULL;
|
||||||
Var *sourceDistributionColumn = NULL;
|
|
||||||
Var *targetDistributionColumn = NULL;
|
|
||||||
Oid sourceDistributionColumnType = InvalidOid;
|
|
||||||
Oid targetDistributionColumnType = InvalidOid;
|
|
||||||
|
|
||||||
CheckHashPartitionedTable(sourceRelationId);
|
CheckHashPartitionedTable(sourceRelationId);
|
||||||
CheckHashPartitionedTable(targetRelationId);
|
CheckHashPartitionedTable(targetRelationId);
|
||||||
|
|
||||||
sourceDistributionColumn = PartitionKey(sourceRelationId);
|
CheckReplicationModel(sourceRelationId, targetRelationId);
|
||||||
sourceDistributionColumnType = sourceDistributionColumn->vartype;
|
CheckDistributionColumnType(sourceRelationId, targetRelationId);
|
||||||
|
|
||||||
targetDistributionColumn = PartitionKey(targetRelationId);
|
|
||||||
targetDistributionColumnType = targetDistributionColumn->vartype;
|
|
||||||
|
|
||||||
if (sourceDistributionColumnType != targetDistributionColumnType)
|
|
||||||
{
|
|
||||||
char *sourceRelationName = get_rel_name(sourceRelationId);
|
|
||||||
char *targetRelationName = get_rel_name(targetRelationId);
|
|
||||||
|
|
||||||
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
|
||||||
sourceRelationName, targetRelationName),
|
|
||||||
errdetail("Distribution column types don't match for "
|
|
||||||
"%s and %s.", sourceRelationName,
|
|
||||||
targetRelationName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
||||||
|
@ -143,6 +122,9 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
||||||
uint32 shardCount = ShardIntervalCount(sourceRelationId);
|
uint32 shardCount = ShardIntervalCount(sourceRelationId);
|
||||||
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
|
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
|
||||||
|
|
||||||
|
Var *sourceDistributionColumn = PartitionKey(sourceRelationId);
|
||||||
|
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
|
||||||
|
|
||||||
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
|
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
|
||||||
sourceDistributionColumnType);
|
sourceDistributionColumnType);
|
||||||
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
|
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
|
||||||
|
@ -487,8 +469,8 @@ GetNextColocationId()
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CheckReplicationModel checks if given relation and colocation group are from
|
* CheckReplicationModel checks if given relations are from the same
|
||||||
* the same replication model. Otherwise, it errors out.
|
* replication model. Otherwise, it errors out.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId)
|
CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId)
|
||||||
|
@ -506,10 +488,45 @@ CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId)
|
||||||
|
|
||||||
if (sourceReplicationModel != targetReplicationModel)
|
if (sourceReplicationModel != targetReplicationModel)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||||
errmsg("cannot create colocation"),
|
char *targetRelationName = get_rel_name(targetRelationId);
|
||||||
errdetail("Colocating tables with different replication "
|
|
||||||
"models is not supported.")));
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
|
sourceRelationName, targetRelationName),
|
||||||
|
errdetail("Replication models don't match for %s and %s.",
|
||||||
|
sourceRelationName, targetRelationName)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CheckDistributionColumnType checks if distribution column types of relations
|
||||||
|
* are same. Otherwise, it errors out.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId)
|
||||||
|
{
|
||||||
|
Var *sourceDistributionColumn = NULL;
|
||||||
|
Var *targetDistributionColumn = NULL;
|
||||||
|
Oid sourceDistributionColumnType = InvalidOid;
|
||||||
|
Oid targetDistributionColumnType = InvalidOid;
|
||||||
|
|
||||||
|
sourceDistributionColumn = PartitionKey(sourceRelationId);
|
||||||
|
sourceDistributionColumnType = sourceDistributionColumn->vartype;
|
||||||
|
|
||||||
|
targetDistributionColumn = PartitionKey(targetRelationId);
|
||||||
|
targetDistributionColumnType = targetDistributionColumn->vartype;
|
||||||
|
|
||||||
|
if (sourceDistributionColumnType != targetDistributionColumnType)
|
||||||
|
{
|
||||||
|
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||||
|
char *targetRelationName = get_rel_name(targetRelationId);
|
||||||
|
|
||||||
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
|
sourceRelationName, targetRelationName),
|
||||||
|
errdetail("Distribution column types don't match for "
|
||||||
|
"%s and %s.", sourceRelationName,
|
||||||
|
targetRelationName)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -854,7 +871,7 @@ DeleteColocationGroup(uint32 colocationId)
|
||||||
scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
|
scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
|
||||||
NULL, scanKeyCount, scanKey);
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
/* if a record id found, delete it */
|
/* if a record is found, delete it */
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
if (HeapTupleIsValid(heapTuple))
|
if (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
|
|
|
@ -30,6 +30,7 @@ extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
||||||
Oid distributionColumnType);
|
Oid distributionColumnType);
|
||||||
extern uint32 GetNextColocationId(void);
|
extern uint32 GetNextColocationId(void);
|
||||||
extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId);
|
extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId);
|
||||||
|
extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId);
|
||||||
|
|
||||||
|
|
||||||
#endif /* COLOCATION_UTILS_H_ */
|
#endif /* COLOCATION_UTILS_H_ */
|
||||||
|
|
Loading…
Reference in New Issue