From 3341e3d9dcf5b073e2deb142612f04cee34107f7 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Thu, 16 Oct 2025 16:54:22 +0300 Subject: [PATCH] fix create_distributed_table --- .../commands/create_distributed_table.c | 25 ++++++----------- .../distributed/utils/colocation_utils.c | 28 ++++++++----------- src/include/distributed/colocation_utils.h | 6 ++-- 3 files changed, 22 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 38fadb0f3..dc8cc965e 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -530,8 +530,6 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId, distributionColumnName, NoLock); - Oid distributionColumnType = distributionColumn->vartype; - Oid distributionColumnCollation = distributionColumn->varcollid; /* get an advisory lock to serialize concurrent default group creations */ if (IsColocateWithDefault(colocateWithTableName)) @@ -546,8 +544,7 @@ CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, */ uint32 colocationId = FindColocateWithColocationId(relationId, replicationModel, - distributionColumnType, - distributionColumnCollation, + distributionColumn, shardCount, shardCountIsStrict, colocateWithTableName); @@ -696,18 +693,14 @@ EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod, char replicationModel = DecideDistTableReplicationModel(distributionMethod, colocateWithTableName); - /* - * we fail transaction before local table conversion if the table could not be colocated with - * given table. We should make those checks after local table conversion by acquiring locks to - * the relation because the distribution column can be modified in that period. - */ - Oid distributionColumnType = ColumnTypeIdForRelationColumnName(relationId, - distributionColumnName); - text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false); + + Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId, + distributionColumnName, + NoLock); EnsureTableCanBeColocatedWith(relationId, replicationModel, - distributionColumnType, colocateWithTableId); + distributionColumn, colocateWithTableId); } @@ -1998,10 +1991,11 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType, * until this transaction is committed. */ + /* distributionColumn can only be null for single-shard tables */ Oid distributionColumnType = distributionColumn ? distributionColumn->vartype : InvalidOid; Oid distributionColumnCollation = - distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid; + distributionColumn ? distributionColumn->varcollid : InvalidOid; Assert(distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_TABLE_LIKE_OPT); @@ -2016,8 +2010,7 @@ ColocationIdForNewTable(Oid relationId, CitusTableType tableType, colocationId = FindColocateWithColocationId(relationId, citusTableParams.replicationModel, - distributionColumnType, - distributionColumnCollation, + distributionColumn, distributedTableParams->shardCount, distributedTableParams-> shardCountIsStrict, diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index af507d5b9..98ada2346 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -1404,8 +1404,7 @@ DeleteColocationGroupLocally(uint32 colocationId) */ uint32 FindColocateWithColocationId(Oid relationId, char replicationModel, - Oid distributionColumnType, - Oid distributionColumnCollation, + Var *distributionColumn, int shardCount, bool shardCountIsStrict, char *colocateWithTableName) { @@ -1413,6 +1412,12 @@ FindColocateWithColocationId(Oid relationId, char replicationModel, if (IsColocateWithDefault(colocateWithTableName)) { + /* distributionColumn can only be null for single-shard tables */ + Oid distributionColumnType = + distributionColumn ? distributionColumn->vartype : InvalidOid; + Oid distributionColumnCollation = + distributionColumn ? distributionColumn->varcollid : InvalidOid; + /* check for default colocation group */ colocationId = ColocationId(shardCount, ShardReplicationFactor, distributionColumnType, @@ -1445,7 +1450,7 @@ FindColocateWithColocationId(Oid relationId, char replicationModel, Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false); EnsureTableCanBeColocatedWith(relationId, replicationModel, - distributionColumnType, sourceRelationId); + distributionColumn, sourceRelationId); colocationId = TableColocationId(sourceRelationId); } @@ -1463,7 +1468,7 @@ FindColocateWithColocationId(Oid relationId, char replicationModel, */ void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, - Oid distributionColumnType, Oid sourceRelationId) + Var *distributionColumn, Oid sourceRelationId) { CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId); @@ -1491,19 +1496,8 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, } Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId); - Oid sourceDistributionColumnType = !sourceDistributionColumn ? InvalidOid : - sourceDistributionColumn->vartype; - if (sourceDistributionColumnType != distributionColumnType) - { - char *relationName = get_rel_name(relationId); - char *sourceRelationName = get_rel_name(sourceRelationId); - - ereport(ERROR, (errmsg("cannot colocate tables %s and %s", - sourceRelationName, relationName), - errdetail("Distribution column types don't match for " - "%s and %s.", sourceRelationName, - relationName))); - } + EnsureColumnTypeEquality(sourceRelationId, relationId, + sourceDistributionColumn, distributionColumn); /* prevent colocating regular tables with tenant tables */ Oid sourceRelationSchemaId = get_rel_namespace(sourceRelationId); diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 018f97570..b5ce0a28f 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -53,13 +53,11 @@ extern List * ColocationGroupTableList(uint32 colocationId, uint32 count); extern void DeleteColocationGroup(uint32 colocationId); extern void DeleteColocationGroupLocally(uint32 colocationId); extern uint32 FindColocateWithColocationId(Oid relationId, char replicationModel, - Oid distributionColumnType, - Oid distributionColumnCollation, + Var *distributionColumn, int shardCount, bool shardCountIsStrict, char *colocateWithTableName); extern void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, - Oid distributionColumnType, - Oid sourceRelationId); + Var *distributionColumn, Oid sourceRelationId); extern void AcquireColocationDefaultLock(void); extern void ReleaseColocationDefaultLock(void);