fix create_distributed_table

fix-collation-mismatch-create-dist-table
Onur Tirtir 2025-10-16 16:54:22 +03:00
parent aa0ac0af60
commit 3341e3d9dc
3 changed files with 22 additions and 37 deletions

View File

@ -530,8 +530,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId, Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributionColumnName, distributionColumnName,
NoLock); NoLock);
Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = distributionColumn->varcollid;
/* get an advisory lock to serialize concurrent default group creations */ /* get an advisory lock to serialize concurrent default group creations */
if (IsColocateWithDefault(colocateWithTableName)) if (IsColocateWithDefault(colocateWithTableName))
@ -546,8 +544,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName,
*/ */
uint32 colocationId = FindColocateWithColocationId(relationId, uint32 colocationId = FindColocateWithColocationId(relationId,
replicationModel, replicationModel,
distributionColumnType, distributionColumn,
distributionColumnCollation,
shardCount, shardCount,
shardCountIsStrict, shardCountIsStrict,
colocateWithTableName); colocateWithTableName);
@ -696,18 +693,14 @@ EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod,
char replicationModel = DecideDistTableReplicationModel(distributionMethod, char replicationModel = DecideDistTableReplicationModel(distributionMethod,
colocateWithTableName); colocateWithTableName);
/*
* we fail transaction before local table conversion if the table could not be colocated with
* given table. We should make those checks after local table conversion by acquiring locks to
* the relation because the distribution column can be modified in that period.
*/
Oid distributionColumnType = ColumnTypeIdForRelationColumnName(relationId,
distributionColumnName);
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false); Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false);
Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId,
distributionColumnName,
NoLock);
EnsureTableCanBeColocatedWith(relationId, replicationModel, EnsureTableCanBeColocatedWith(relationId, replicationModel,
distributionColumnType, colocateWithTableId); distributionColumn, colocateWithTableId);
} }
@ -1998,10 +1991,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
* until this transaction is committed. * until this transaction is committed.
*/ */
/* distributionColumn can only be null for single-shard tables */
Oid distributionColumnType = Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid; distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation = Oid distributionColumnCollation =
distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid; distributionColumn ? distributionColumn->varcollid : InvalidOid;
Assert(distributedTableParams->colocationParam.colocationParamType == Assert(distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_TABLE_LIKE_OPT); COLOCATE_WITH_TABLE_LIKE_OPT);
@ -2016,8 +2010,7 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType,
colocationId = FindColocateWithColocationId(relationId, colocationId = FindColocateWithColocationId(relationId,
citusTableParams.replicationModel, citusTableParams.replicationModel,
distributionColumnType, distributionColumn,
distributionColumnCollation,
distributedTableParams->shardCount, distributedTableParams->shardCount,
distributedTableParams-> distributedTableParams->
shardCountIsStrict, shardCountIsStrict,

View File

@ -1404,8 +1404,7 @@ DeleteColocationGroupLocally(uint32 colocationId)
*/ */
uint32 uint32
FindColocateWithColocationId(Oid relationId, char replicationModel, FindColocateWithColocationId(Oid relationId, char replicationModel,
Oid distributionColumnType, Var *distributionColumn,
Oid distributionColumnCollation,
int shardCount, bool shardCountIsStrict, int shardCount, bool shardCountIsStrict,
char *colocateWithTableName) char *colocateWithTableName)
{ {
@ -1413,6 +1412,12 @@ FindColocateWithColocationId(Oid relationId, char replicationModel,
if (IsColocateWithDefault(colocateWithTableName)) if (IsColocateWithDefault(colocateWithTableName))
{ {
/* distributionColumn can only be null for single-shard tables */
Oid distributionColumnType =
distributionColumn ? distributionColumn->vartype : InvalidOid;
Oid distributionColumnCollation =
distributionColumn ? distributionColumn->varcollid : InvalidOid;
/* check for default colocation group */ /* check for default colocation group */
colocationId = ColocationId(shardCount, ShardReplicationFactor, colocationId = ColocationId(shardCount, ShardReplicationFactor,
distributionColumnType, distributionColumnType,
@ -1445,7 +1450,7 @@ FindColocateWithColocationId(Oid relationId, char replicationModel,
Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false); Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false);
EnsureTableCanBeColocatedWith(relationId, replicationModel, EnsureTableCanBeColocatedWith(relationId, replicationModel,
distributionColumnType, sourceRelationId); distributionColumn, sourceRelationId);
colocationId = TableColocationId(sourceRelationId); colocationId = TableColocationId(sourceRelationId);
} }
@ -1463,7 +1468,7 @@ FindColocateWithColocationId(Oid relationId, char replicationModel,
*/ */
void void
EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType, Oid sourceRelationId) Var *distributionColumn, Oid sourceRelationId)
{ {
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId); CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
@ -1491,19 +1496,8 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
} }
Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId); Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId);
Oid sourceDistributionColumnType = !sourceDistributionColumn ? InvalidOid : EnsureColumnTypeEquality(sourceRelationId, relationId,
sourceDistributionColumn->vartype; sourceDistributionColumn, distributionColumn);
if (sourceDistributionColumnType != distributionColumnType)
{
char *relationName = get_rel_name(relationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, relationName),
errdetail("Distribution column types don't match for "
"%s and %s.", sourceRelationName,
relationName)));
}
/* prevent colocating regular tables with tenant tables */ /* prevent colocating regular tables with tenant tables */
Oid sourceRelationSchemaId = get_rel_namespace(sourceRelationId); Oid sourceRelationSchemaId = get_rel_namespace(sourceRelationId);

View File

@ -53,13 +53,11 @@ extern List * ColocationGroupTableList(uint32 colocationId, uint32 count);
extern void DeleteColocationGroup(uint32 colocationId); extern void DeleteColocationGroup(uint32 colocationId);
extern void DeleteColocationGroupLocally(uint32 colocationId); extern void DeleteColocationGroupLocally(uint32 colocationId);
extern uint32 FindColocateWithColocationId(Oid relationId, char replicationModel, extern uint32 FindColocateWithColocationId(Oid relationId, char replicationModel,
Oid distributionColumnType, Var *distributionColumn,
Oid distributionColumnCollation,
int shardCount, bool shardCountIsStrict, int shardCount, bool shardCountIsStrict,
char *colocateWithTableName); char *colocateWithTableName);
extern void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, extern void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType, Var *distributionColumn, Oid sourceRelationId);
Oid sourceRelationId);
extern void AcquireColocationDefaultLock(void); extern void AcquireColocationDefaultLock(void);
extern void ReleaseColocationDefaultLock(void); extern void ReleaseColocationDefaultLock(void);