Fix downgrade and following upgrade, fix review comments

pull/7950/head
Maksim Melnikov 2025-04-07 15:46:53 +03:00
parent 458e6abe6d
commit f5c419487d
6 changed files with 89 additions and 75 deletions

View File

@ -320,8 +320,9 @@ static void CachedRelationNamespaceLookup(const char *relationName, Oid relnames
static void CachedRelationNamespaceLookupExtended(const char *relationName, static void CachedRelationNamespaceLookupExtended(const char *relationName,
Oid renamespace, Oid *cachedOid, Oid renamespace, Oid *cachedOid,
bool missing_ok); bool missing_ok);
static ShardPlacement * ResolveGroupShardPlacement( static ShardPlacement * ResolveGroupShardPlacement(GroupShardPlacement *
GroupShardPlacement *groupShardPlacement, CitusTableCacheEntry *tableEntry, groupShardPlacement,
CitusTableCacheEntry *tableEntry,
int shardIndex); int shardIndex);
static Oid LookupEnumValueId(Oid typeId, char *valueName); static Oid LookupEnumValueId(Oid typeId, char *valueName);
static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot); static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSlot);
@ -5029,10 +5030,12 @@ CitusTableTypeIdList(CitusTableType citusTableType)
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple))
{
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
@ -5054,9 +5057,9 @@ CitusTableTypeIdList(CitusTableType citusTableType)
} }
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
}
pfree(datumArray); pfree(datumArray);
pfree(isNullArray); pfree(isNullArray);
}
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock); table_close(pgDistPartition, AccessShareLock);

View File

@ -822,7 +822,8 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
/* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */ /* SELECT SUM(worker_partitioned_...) FROM VALUES (...) */
char *subqueryForPartitionedShards = char *subqueryForPartitionedShards =
GenerateSizeQueryForRelationNameList(partitionedShardNames, GenerateSizeQueryForRelationNameList(partitionedShardNames,
GetWorkerPartitionedSizeUDFNameBySizeQueryType( GetWorkerPartitionedSizeUDFNameBySizeQueryType
(
sizeQueryType)); sizeQueryType));
/* SELECT SUM(pg_..._size) FROM VALUES (...) */ /* SELECT SUM(pg_..._size) FROM VALUES (...) */
@ -1933,7 +1934,8 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
CharGetDatum(distributionMethod); CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = BoolGetDatum(autoConverted); newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = BoolGetDatum(
autoConverted);
/* set partkey column to NULL for reference tables */ /* set partkey column to NULL for reference tables */
if (distributionMethod != DISTRIBUTE_BY_NONE) if (distributionMethod != DISTRIBUTE_BY_NONE)
@ -2213,7 +2215,6 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
bool indexOK = true; bool indexOK = true;
int autoconvertedindex;
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
@ -2236,7 +2237,7 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
citusTableId))); citusTableId)));
} }
autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
values[autoconvertedindex] = BoolGetDatum(autoConverted); values[autoconvertedindex] = BoolGetDatum(autoConverted);
isnull[autoconvertedindex] = false; isnull[autoconvertedindex] = false;
replace[autoconvertedindex] = true; replace[autoconvertedindex] = true;
@ -2294,7 +2295,6 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
bool indexOK = true; bool indexOK = true;
int autoconvertedindex;
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
@ -2325,7 +2325,7 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
isnull[Anum_pg_dist_partition_colocationid - 1] = false; isnull[Anum_pg_dist_partition_colocationid - 1] = false;
autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
replace[autoconvertedindex] = true; replace[autoconvertedindex] = true;
values[autoconvertedindex] = BoolGetDatum(false); values[autoconvertedindex] = BoolGetDatum(false);
isnull[autoconvertedindex] = false; isnull[autoconvertedindex] = false;
@ -2393,7 +2393,6 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
bool indexOK = true; bool indexOK = true;
int autoconvertedindex;
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock); Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
@ -2424,7 +2423,7 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
isnull[Anum_pg_dist_partition_repmodel - 1] = false; isnull[Anum_pg_dist_partition_repmodel - 1] = false;
replace[Anum_pg_dist_partition_repmodel - 1] = true; replace[Anum_pg_dist_partition_repmodel - 1] = true;
autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor); int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
values[autoconvertedindex] = BoolGetDatum(autoConverted); values[autoconvertedindex] = BoolGetDatum(autoConverted);
isnull[autoconvertedindex] = false; isnull[autoconvertedindex] = false;
replace[autoconvertedindex] = true; replace[autoconvertedindex] = true;
@ -3166,8 +3165,8 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_nodes_involved - 1] = values[Anum_pg_dist_background_task_nodes_involved - 1] =
IntArrayToDatum(nodesInvolvedCount, nodesInvolved); IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount == 0)
0); ;
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
values, nulls); values, nulls);
@ -4243,7 +4242,8 @@ CancelTasksForJob(int64 jobid)
const bool indexOK = true; const bool indexOK = true;
SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks, SysScanDesc scanDescriptor = systable_beginscan(pgDistBackgroundTasks,
DistBackgroundTaskJobIdTaskIdIndexId(), DistBackgroundTaskJobIdTaskIdIndexId()
,
indexOK, NULL, indexOK, NULL,
lengthof(scanKey), scanKey); lengthof(scanKey), scanKey);
@ -4438,11 +4438,17 @@ UnblockDependingBackgroundTasks(BackgroundTask *task)
table_close(pgDistBackgroundTasksDepend, NoLock); table_close(pgDistBackgroundTasksDepend, NoLock);
} }
/* /*
* AutoConverted attr was added to table pg_dist_partition using alter operation, so * GetAutoConvertedAttrIndexInPgDistPartition returns attrnum for autoconverted attr.
* in case of downgrade we can get additional tuple field in tuple desc with dropped flag *
* and in case of futher upgrade we can get number of fields > Natts_pg_dist_partition. * autoconverted attr was added to table pg_dist_partition using alter operation after
* So we should properly address AutoConverted attr in arrays. * the version where Citus started supporting downgrades, and it's only column that we've
* introduced to pg_dist_partition since then.
*
* And in case of a downgrade + upgrade, tupleDEsc->natts becomes greater than
* Natts_pg_dist_partition and when this happens, then we know that attrnum autoconverted is
* not Anum_pg_dist_partition_autoconverted anymore but tupleDEsc->natts - 1.
*/ */
int int
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc) GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc)

View File

@ -557,8 +557,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType,
continue; continue;
} }
if (colocationId == INVALID_COLOCATION_ID || colocationId > if (colocationId == INVALID_COLOCATION_ID || colocationId > colocationForm->
colocationForm->colocationid) colocationid)
{ {
/* /*
* We assign the smallest colocation id among all the matches so that we * We assign the smallest colocation id among all the matches so that we
@ -999,18 +999,18 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
indexOK, NULL, scanKeyCount, scanKey); indexOK, NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple))
{
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
Oid colocatedTableId = DatumGetObjectId( Oid colocatedTableId = DatumGetObjectId(
datumArray[Anum_pg_dist_partition_logicalrelid - 1]); datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId); colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId);
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
pfree(datumArray);
pfree(isNullArray);
if (count == 0) if (count == 0)
{ {
@ -1023,6 +1023,8 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
break; break;
} }
} }
pfree(datumArray);
pfree(isNullArray);
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock); table_close(pgDistPartition, AccessShareLock);
@ -1075,8 +1077,8 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
* Since we iterate over co-located tables, shard count of each table should be * Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex. * same and greater than shardIntervalIndex.
*/ */
Assert(cacheEntry->shardIntervalArrayLength == Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry->
colocatedTableCacheEntry->shardIntervalArrayLength); shardIntervalArrayLength);
ShardInterval *colocatedShardInterval = ShardInterval *colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];
@ -1146,8 +1148,8 @@ ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval)
* Since we iterate over co-located tables, shard count of each table should be * Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex. * same and greater than shardIntervalIndex.
*/ */
Assert(cacheEntry->shardIntervalArrayLength == Assert(cacheEntry->shardIntervalArrayLength == colocatedTableCacheEntry->
colocatedTableCacheEntry->shardIntervalArrayLength); shardIntervalArrayLength);
ShardInterval *colocatedShardInterval = ShardInterval *colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];
@ -1195,10 +1197,12 @@ ColocatedTableId(int32 colocationId)
indexOK, NULL, scanKeyCount, scanKey); indexOK, NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple))
{
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
colocatedTableId = DatumGetObjectId( colocatedTableId = DatumGetObjectId(
datumArray[Anum_pg_dist_partition_logicalrelid - 1]); datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
@ -1225,9 +1229,9 @@ ColocatedTableId(int32 colocationId)
colocatedTableId = InvalidOid; colocatedTableId = InvalidOid;
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
}
pfree(datumArray); pfree(datumArray);
pfree(isNullArray); pfree(isNullArray);
}
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock); table_close(pgDistPartition, AccessShareLock);

View File

@ -345,7 +345,8 @@ extern bool IsDummyPlacement(ShardPlacement *taskPlacement);
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
Oid indexId, Oid indexId,
SizeQueryType sizeQueryType, SizeQueryType sizeQueryType,
bool optimizePartitionCalculations); bool optimizePartitionCalculations
);
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList); extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
/* Function declarations to modify shard and shard placement data */ /* Function declarations to modify shard and shard placement data */