From c9d0ae2d5daa8819d4803efa3ec510fdb30dd3a4 Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Thu, 3 Apr 2025 18:15:47 +0300 Subject: [PATCH 1/3] Fix downgrade and following upgrade --- .../distributed/cdc/cdc_decoder_utils.c | 10 +- .../distributed/metadata/metadata_cache.c | 50 ++++++--- .../distributed/metadata/metadata_sync.c | 8 +- .../distributed/metadata/metadata_utility.c | 106 ++++++++++++------ .../distributed/utils/colocation_utils.c | 27 +++-- src/include/distributed/metadata_utility.h | 1 + 6 files changed, 134 insertions(+), 68 deletions(-) diff --git a/src/backend/distributed/cdc/cdc_decoder_utils.c b/src/backend/distributed/cdc/cdc_decoder_utils.c index b571d18b9..096e712ef 100644 --- a/src/backend/distributed/cdc/cdc_decoder_utils.c +++ b/src/backend/distributed/cdc/cdc_decoder_utils.c @@ -346,12 +346,12 @@ CdcIsReferenceTableViaCatalog(Oid relationId) return false; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partmethod - 1] || @@ -363,6 +363,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId) */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return false; } @@ -374,6 +376,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); /* * A table is a reference table when its partition method is 'none' diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 79cc61092..c3db44c31 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -729,12 +729,13 @@ PartitionMethodViaCatalog(Oid relationId) return DISTRIBUTE_BY_INVALID; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partmethod - 1]) @@ -742,6 +743,8 @@ PartitionMethodViaCatalog(Oid relationId) /* partition method cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return DISTRIBUTE_BY_INVALID; } @@ -750,6 +753,8 @@ PartitionMethodViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return partitionMethodChar; } @@ -768,12 +773,12 @@ PartitionColumnViaCatalog(Oid relationId) return NULL; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_partkey - 1]) @@ -781,6 +786,8 @@ PartitionColumnViaCatalog(Oid relationId) /* partition key cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return NULL; } @@ -795,6 +802,8 @@ PartitionColumnViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return partitionColumn; } @@ -813,12 +822,13 @@ ColocationIdViaCatalog(Oid relationId) return INVALID_COLOCATION_ID; } - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); if (isNullArray[Anum_pg_dist_partition_colocationid - 1]) @@ -826,6 +836,8 @@ ColocationIdViaCatalog(Oid relationId) /* colocation id cannot be NULL, still let's make sure */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return INVALID_COLOCATION_ID; } @@ -834,6 +846,8 @@ ColocationIdViaCatalog(Oid relationId) heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); return colocationId; } @@ -1741,10 +1755,11 @@ BuildCitusTableCacheEntry(Oid relationId) } MemoryContext oldContext = NULL; - Datum datumArray[Natts_pg_dist_partition]; - bool isNullArray[Natts_pg_dist_partition]; TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray); CitusTableCacheEntry *cacheEntry = @@ -1797,7 +1812,7 @@ BuildCitusTableCacheEntry(Oid relationId) cacheEntry->replicationModel = DatumGetChar(replicationModelDatum); } - if (isNullArray[Anum_pg_dist_partition_autoconverted - 1]) + if (isNullArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]) { /* * We don't expect this to happen, but set it to false (the default value) @@ -1808,7 +1823,7 @@ BuildCitusTableCacheEntry(Oid relationId) else { cacheEntry->autoConverted = DatumGetBool( - datumArray[Anum_pg_dist_partition_autoconverted - 1]); + datumArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]); } heap_freetuple(distPartitionTuple); @@ -1852,6 +1867,9 @@ BuildCitusTableCacheEntry(Oid relationId) table_close(pgDistPartition, NoLock); + pfree(datumArray); + pfree(isNullArray); + cacheEntry->isValid = true; return cacheEntry; @@ -5013,8 +5031,8 @@ CitusTableTypeIdList(CitusTableType citusTableType) HeapTuple heapTuple = systable_getnext(scanDescriptor); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; @@ -5036,6 +5054,8 @@ CitusTableTypeIdList(CitusTableType citusTableType) } heapTuple = systable_getnext(scanDescriptor); + pfree(datumArray); + pfree(isNullArray); } systable_endscan(scanDescriptor); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f73856169..6a1a83314 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -573,10 +573,14 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc { Assert(heapTuple->t_tableOid == DistPartitionRelationId()); - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + Datum* datumArray = (Datum *) palloc0(tupleDesc->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc0(tupleDesc->natts * sizeof(bool)); + heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray); + pfree(datumArray); + pfree(isNullArray); + Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1]; Oid relationId = DatumGetObjectId(relationIdDatum); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index fad263abd..6346611c0 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1919,23 +1919,21 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, { char *distributionColumnString = NULL; - Datum newValues[Natts_pg_dist_partition]; - bool newNulls[Natts_pg_dist_partition]; - /* open system catalog and insert new tuple */ Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + + Datum* newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool* newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); /* form new tuple for pg_dist_partition */ - memset(newValues, 0, sizeof(newValues)); - memset(newNulls, false, sizeof(newNulls)); - newValues[Anum_pg_dist_partition_logicalrelid - 1] = ObjectIdGetDatum(relationId); newValues[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); - newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); + newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = BoolGetDatum(autoConverted); /* set partkey column to NULL for reference tables */ if (distributionMethod != DISTRIBUTE_BY_NONE) @@ -1951,7 +1949,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, newNulls[Anum_pg_dist_partition_partkey - 1] = true; } - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, + HeapTuple newTuple = heap_form_tuple(tupleDescriptor, newValues, newNulls); /* finally insert tuple, build index entries & register cache invalidation */ @@ -1963,6 +1961,9 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, CommandCounterIncrement(); table_close(pgDistPartition, NoLock); + + pfree(newValues); + pfree(newNulls); } @@ -2154,13 +2155,13 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_placement]; - bool isnull[Natts_pg_dist_placement]; - bool replace[Natts_pg_dist_placement]; bool colIsNull = false; Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); + Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId)); @@ -2177,8 +2178,6 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) placementId))); } - memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId); isnull[Anum_pg_dist_placement_groupid - 1] = false; replace[Anum_pg_dist_placement_groupid - 1] = true; @@ -2197,6 +2196,10 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) systable_endscan(scanDescriptor); table_close(pgDistPlacement, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2210,12 +2213,14 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; + int autoconvertedindex; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId)); @@ -2231,11 +2236,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) citusTableId))); } - memset(replace, 0, sizeof(replace)); - - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; + autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + values[autoconvertedindex] = BoolGetDatum(autoConverted); + isnull[autoconvertedindex] = false; + replace[autoconvertedindex] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -2247,6 +2251,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2286,12 +2294,14 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; + int autoconvertedindex; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2307,8 +2317,6 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut relationId))); } - memset(replace, 0, sizeof(replace)); - replace[Anum_pg_dist_partition_partmethod - 1] = true; values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod); isnull[Anum_pg_dist_partition_partmethod - 1] = false; @@ -2317,9 +2325,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isnull[Anum_pg_dist_partition_colocationid - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(false); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; + autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + replace[autoconvertedindex] = true; + values[autoconvertedindex] = BoolGetDatum(false); + isnull[autoconvertedindex] = false; char *distributionColumnString = nodeToString((Node *) distributionColumn); @@ -2337,6 +2346,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2380,12 +2393,14 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - Datum values[Natts_pg_dist_partition]; - bool isnull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; + int autoconvertedindex; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2401,8 +2416,6 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca relationId))); } - memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isnull[Anum_pg_dist_partition_colocationid - 1] = false; replace[Anum_pg_dist_partition_colocationid - 1] = true; @@ -2411,9 +2424,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca isnull[Anum_pg_dist_partition_repmodel - 1] = false; replace[Anum_pg_dist_partition_repmodel - 1] = true; - values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); - isnull[Anum_pg_dist_partition_autoconverted - 1] = false; - replace[Anum_pg_dist_partition_autoconverted - 1] = true; + autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + values[autoconvertedindex] = BoolGetDatum(autoConverted); + isnull[autoconvertedindex] = false; + replace[autoconvertedindex] = true; heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); @@ -2424,6 +2438,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -4401,3 +4419,17 @@ UnblockDependingBackgroundTasks(BackgroundTask *task) table_close(pgDistBackgroundTasksDepend, NoLock); } + +/* + * AutoConverted attr was added to table pg_dist_partition using alter operation, so + * in case of downgrade we can get additional tuple field in tuple desc with dropped flag + * and in case of futher upgrade we can get number of fields > Natts_pg_dist_partition. + * So we should properly address AutoConverted attr in arrays. + */ +int +GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc) +{ + return TupleDescSize(tupleDEsc) == Natts_pg_dist_partition + ? (Anum_pg_dist_partition_autoconverted - 1) + : tupleDEsc->natts - 1; +} \ No newline at end of file diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index e2af11a1d..933c15765 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -815,13 +815,14 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, bool indexOK = true; int scanKeyCount = 1; ScanKeyData scanKey[1]; - Datum values[Natts_pg_dist_partition]; - bool isNull[Natts_pg_dist_partition]; - bool replace[Natts_pg_dist_partition]; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool* isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId)); @@ -838,10 +839,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, distributedRelationName))); } - memset(values, 0, sizeof(values)); - memset(isNull, false, sizeof(isNull)); - memset(replace, false, sizeof(replace)); - values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isNull[Anum_pg_dist_partition_colocationid - 1] = false; replace[Anum_pg_dist_partition_colocationid - 1] = true; @@ -858,6 +855,10 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, systable_endscan(scanDescriptor); table_close(pgDistPartition, NoLock); + pfree(values); + pfree(isNull); + pfree(replace); + bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId); if (shouldSyncMetadata && !localOnly) { @@ -1000,14 +1001,16 @@ ColocationGroupTableList(uint32 colocationId, uint32 count) HeapTuple heapTuple = systable_getnext(scanDescriptor); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Oid colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId); heapTuple = systable_getnext(scanDescriptor); + pfree(datumArray); + pfree(isNullArray); if (count == 0) { @@ -1194,8 +1197,8 @@ ColocatedTableId(int32 colocationId) HeapTuple heapTuple = systable_getnext(scanDescriptor); while (HeapTupleIsValid(heapTuple)) { - bool isNullArray[Natts_pg_dist_partition]; - Datum datumArray[Natts_pg_dist_partition]; + Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); @@ -1222,6 +1225,8 @@ ColocatedTableId(int32 colocationId) colocatedTableId = InvalidOid; heapTuple = systable_getnext(scanDescriptor); + pfree(datumArray); + pfree(isNullArray); } systable_endscan(scanDescriptor); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 737e1283b..a8fb4ff9e 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -466,4 +466,5 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status); extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status); extern Oid BackgroundJobStatusOid(BackgroundJobStatus status); extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status); +extern int GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc); #endif /* METADATA_UTILITY_H */ From f5c419487d1f50f8e161900ed46c2f3fe55480f9 Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Mon, 7 Apr 2025 15:46:53 +0300 Subject: [PATCH 2/3] Fix downgrade and following upgrade, fix review comments --- .../distributed/cdc/cdc_decoder_utils.c | 4 +- .../distributed/metadata/metadata_cache.c | 33 +++++---- .../distributed/metadata/metadata_sync.c | 4 +- .../distributed/metadata/metadata_utility.c | 72 ++++++++++--------- .../distributed/utils/colocation_utils.c | 38 +++++----- src/include/distributed/metadata_utility.h | 13 ++-- 6 files changed, 89 insertions(+), 75 deletions(-) diff --git a/src/backend/distributed/cdc/cdc_decoder_utils.c b/src/backend/distributed/cdc/cdc_decoder_utils.c index 096e712ef..9053d1b68 100644 --- a/src/backend/distributed/cdc/cdc_decoder_utils.c +++ b/src/backend/distributed/cdc/cdc_decoder_utils.c @@ -349,8 +349,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId) Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index c3db44c31..4095b2ba3 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -320,9 +320,10 @@ static void CachedRelationNamespaceLookup(const char *relationName, Oid relnames static void CachedRelationNamespaceLookupExtended(const char *relationName, Oid renamespace, Oid *cachedOid, bool missing_ok); -static ShardPlacement * ResolveGroupShardPlacement( - GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, - int shardIndex); +static ShardPlacement * ResolveGroupShardPlacement(GroupShardPlacement * + groupShardPlacement, + CitusTableCacheEntry *tableEntry, + int shardIndex); static Oid LookupEnumValueId(Oid typeId, char *valueName); static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot); static void InvalidateDistTableCache(void); @@ -733,8 +734,8 @@ PartitionMethodViaCatalog(Oid relationId) TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); @@ -776,8 +777,8 @@ PartitionColumnViaCatalog(Oid relationId) Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); @@ -826,8 +827,8 @@ ColocationIdViaCatalog(Oid relationId) TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); @@ -1757,8 +1758,8 @@ BuildCitusTableCacheEntry(Oid relationId) MemoryContext oldContext = NULL; TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray); @@ -5029,10 +5030,12 @@ CitusTableTypeIdList(CitusTableType citusTableType) TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; @@ -5054,9 +5057,9 @@ CitusTableTypeIdList(CitusTableType citusTableType) } heapTuple = systable_getnext(scanDescriptor); - pfree(datumArray); - pfree(isNullArray); } + pfree(datumArray); + pfree(isNullArray); systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6a1a83314..6050076a7 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -573,8 +573,8 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc { Assert(heapTuple->t_tableOid == DistPartitionRelationId()); - Datum* datumArray = (Datum *) palloc0(tupleDesc->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc0(tupleDesc->natts * sizeof(bool)); + Datum *datumArray = (Datum *) palloc0(tupleDesc->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc0(tupleDesc->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 5e1cc96f4..23ead375d 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -127,11 +127,11 @@ static bool SetFieldText(int attno, Datum values[], bool isnull[], bool replace[ static bool SetFieldNull(int attno, Datum values[], bool isnull[], bool replace[]); #define InitFieldValue(attno, values, isnull, initValue) \ - (void) SetFieldValue((attno), (values), (isnull), NULL, (initValue)) + (void) SetFieldValue((attno), (values), (isnull), NULL, (initValue)) #define InitFieldText(attno, values, isnull, initValue) \ - (void) SetFieldText((attno), (values), (isnull), NULL, (initValue)) + (void) SetFieldText((attno), (values), (isnull), NULL, (initValue)) #define InitFieldNull(attno, values, isnull) \ - (void) SetFieldNull((attno), (values), (isnull), NULL) + (void) SetFieldNull((attno), (values), (isnull), NULL) /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(citus_local_disk_space_stats); @@ -822,7 +822,8 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, /* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */ char *subqueryForPartitionedShards = GenerateSizeQueryForRelationNameList(partitionedShardNames, - GetWorkerPartitionedSizeUDFNameBySizeQueryType( + GetWorkerPartitionedSizeUDFNameBySizeQueryType + ( sizeQueryType)); /* SELECT SUM(pg_..._size) FROM VALUES (...) */ @@ -1923,8 +1924,8 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); - bool* newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + Datum *newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); /* form new tuple for pg_dist_partition */ newValues[Anum_pg_dist_partition_logicalrelid - 1] = @@ -1933,7 +1934,8 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); - newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = BoolGetDatum(autoConverted); + newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = BoolGetDatum( + autoConverted); /* set partkey column to NULL for reference tables */ if (distributionMethod != DISTRIBUTE_BY_NONE) @@ -2159,9 +2161,9 @@ UpdatePlacementGroupId(uint64 placementId, int groupId) Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement); - Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); - bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); - bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId)); @@ -2213,13 +2215,12 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - int autoconvertedindex; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); - bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); - bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId)); @@ -2236,7 +2237,7 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted) citusTableId))); } - autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); values[autoconvertedindex] = BoolGetDatum(autoConverted); isnull[autoconvertedindex] = false; replace[autoconvertedindex] = true; @@ -2294,13 +2295,12 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - int autoconvertedindex; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); - bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); - bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2325,7 +2325,7 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); isnull[Anum_pg_dist_partition_colocationid - 1] = false; - autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); replace[autoconvertedindex] = true; values[autoconvertedindex] = BoolGetDatum(false); isnull[autoconvertedindex] = false; @@ -2393,13 +2393,12 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca ScanKeyData scanKey[1]; int scanKeyCount = 1; bool indexOK = true; - int autoconvertedindex; Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); - bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); - bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); @@ -2424,7 +2423,7 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca isnull[Anum_pg_dist_partition_repmodel - 1] = false; replace[Anum_pg_dist_partition_repmodel - 1] = true; - autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); + int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); values[autoconvertedindex] = BoolGetDatum(autoConverted); isnull[autoconvertedindex] = false; replace[autoconvertedindex] = true; @@ -3166,8 +3165,8 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC values[Anum_pg_dist_background_task_nodes_involved - 1] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved); - nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == - 0); + nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == 0) + ; HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), values, nulls); @@ -4243,7 +4242,8 @@ CancelTasksForJob(int64 jobid) const bool indexOK = true; SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, - DistBackgroundTaskJobIdTaskIdIndexId(), + DistBackgroundTaskJobIdTaskIdIndexId() + , indexOK, NULL, lengthof(scanKey), scanKey); @@ -4438,11 +4438,17 @@ UnblockDependingBackgroundTasks(BackgroundTask *task) table_close(pgDistBackgroundTasksDepend, NoLock); } + /* - * AutoConverted attr was added to table pg_dist_partition using alter operation, so - * in case of downgrade we can get additional tuple field in tuple desc with dropped flag - * and in case of futher upgrade we can get number of fields > Natts_pg_dist_partition. - * So we should properly address AutoConverted attr in arrays. + * GetAutoConvertedAttrIndexInPgDistPartition returns attrnum for autoconverted attr. + * + * autoconverted attr was added to table pg_dist_partition using alter operation after + * the version where Citus started supporting downgrades, and it's only column that we've + * introduced to pg_dist_partition since then. + * + * And in case of a downgrade + upgrade, tupleDEsc->natts becomes greater than + * Natts_pg_dist_partition and when this happens, then we know that attrnum autoconverted is + * not Anum_pg_dist_partition_autoconverted anymore but tupleDEsc->natts - 1. */ int GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc) @@ -4450,4 +4456,4 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc) return TupleDescSize(tupleDEsc) == Natts_pg_dist_partition ? (Anum_pg_dist_partition_autoconverted - 1) : tupleDEsc->natts - 1; -} \ No newline at end of file +} diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 933c15765..be6c4f264 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -557,8 +557,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType, continue; } - if (colocationId == INVALID_COLOCATION_ID || colocationId > - colocationForm->colocationid) + if (colocationId == INVALID_COLOCATION_ID || colocationId > colocationForm-> + colocationid) { /* * We assign the smallest colocation id among all the matches so that we @@ -819,9 +819,9 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId, Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); - bool* isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); - bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId)); @@ -999,18 +999,18 @@ ColocationGroupTableList(uint32 colocationId, uint32 count) indexOK, NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); Oid colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId); heapTuple = systable_getnext(scanDescriptor); - pfree(datumArray); - pfree(isNullArray); if (count == 0) { @@ -1023,6 +1023,8 @@ ColocationGroupTableList(uint32 colocationId, uint32 count) break; } } + pfree(datumArray); + pfree(isNullArray); systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); @@ -1075,8 +1077,8 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) * Since we iterate over co-located tables, shard count of each table should be * same and greater than shardIntervalIndex. */ - Assert(cacheEntry->shardIntervalArrayLength == - colocatedTableCacheEntry->shardIntervalArrayLength); + Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry-> + shardIntervalArrayLength); ShardInterval *colocatedShardInterval = colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; @@ -1146,8 +1148,8 @@ ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval) * Since we iterate over co-located tables, shard count of each table should be * same and greater than shardIntervalIndex. */ - Assert(cacheEntry->shardIntervalArrayLength == - colocatedTableCacheEntry->shardIntervalArrayLength); + Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry-> + shardIntervalArrayLength); ShardInterval *colocatedShardInterval = colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; @@ -1195,10 +1197,12 @@ ColocatedTableId(int32 colocationId) indexOK, NULL, scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); + Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); while (HeapTupleIsValid(heapTuple)) { - Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); - bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); colocatedTableId = DatumGetObjectId( datumArray[Anum_pg_dist_partition_logicalrelid - 1]); @@ -1225,9 +1229,9 @@ ColocatedTableId(int32 colocationId) colocatedTableId = InvalidOid; heapTuple = systable_getnext(scanDescriptor); - pfree(datumArray); - pfree(isNullArray); } + pfree(datumArray); + pfree(isNullArray); systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index a507138d2..24cd203d2 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -40,7 +40,7 @@ #define WORKER_PARTITIONED_TABLE_SIZE_FUNCTION "worker_partitioned_table_size(%s)" #define WORKER_PARTITIONED_RELATION_SIZE_FUNCTION "worker_partitioned_relation_size(%s)" #define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \ - "worker_partitioned_relation_total_size(%s)" + "worker_partitioned_relation_total_size(%s)" #define SHARD_SIZES_COLUMN_COUNT (2) @@ -302,12 +302,12 @@ typedef struct BackgroundTask } BackgroundTask; #define SET_NULLABLE_FIELD(ptr, field, value) \ - (ptr)->__nullable_storage.field = (value); \ - (ptr)->field = &((ptr)->__nullable_storage.field) + (ptr)->__nullable_storage.field = (value); \ + (ptr)->field = &((ptr)->__nullable_storage.field) #define UNSET_NULLABLE_FIELD(ptr, field) \ - (ptr)->field = NULL; \ - memset_struct_0((ptr)->__nullable_storage.field) + (ptr)->field = NULL; \ + memset_struct_0((ptr)->__nullable_storage.field) /* Size functions */ extern Datum citus_table_size(PG_FUNCTION_ARGS); @@ -345,7 +345,8 @@ extern bool IsDummyPlacement(ShardPlacement *taskPlacement); extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, Oid indexId, SizeQueryType sizeQueryType, - bool optimizePartitionCalculations); + bool optimizePartitionCalculations + ); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); /* Function declarations to modify shard and shard placement data */ From 2281fb97c28fe984fd2107c9ce78adce1ea62f3f Mon Sep 17 00:00:00 2001 From: Maksim Melnikov Date: Tue, 8 Apr 2025 14:51:06 +0300 Subject: [PATCH 3/3] Fix downgrade and following upgrade, fix review comments --- src/backend/distributed/metadata/metadata_sync.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 6050076a7..4b3458a59 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -578,12 +578,12 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray); - pfree(datumArray); - pfree(isNullArray); - Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1]; Oid relationId = DatumGetObjectId(relationIdDatum); + pfree(datumArray); + pfree(isNullArray); + return relationId; }