Fix memory corruptions around columnar.stripe accessors after a Citus downgrade is followed by an upgrade (#8124)

DESCRIPTION: Fixes potential memory corruptions that could happen when
accessing columnar.stripe after a Citus downgrade is followed by a Citus
upgrade.

In case of Citus downgrade and further upgrade an undefined behavior may
be encountered. The reason is that Citus hardcoded the number of columns
in the extension's tables, but in case of downgrade and following update
some of these tables can have more columns, and some of them can be
marked as dropped.

This PR fixes all such tables using the approach introduced in
https://github.com/citusdata/citus/pull/7950, which solved the problem
for the pg_dist_partition table.

See https://github.com/citusdata/citus/issues/7515 for a more thorough
explanation.

---------

Co-authored-by: Karina Litskevich <litskevichkarina@gmail.com>
Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
pull/8120/head^2
Karina 2025-08-18 15:34:26 +03:00 committed by GitHub
parent badaa21cb1
commit e15cc5c63b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 72 additions and 31 deletions

View File

@ -106,7 +106,9 @@ static void GetHighestUsedAddressAndId(uint64 storageId,
uint64 *highestUsedAddress, uint64 *highestUsedAddress,
uint64 *highestUsedId); uint64 *highestUsedId);
static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, static StripeMetadata * UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId,
bool *update, Datum *newValues); uint64 fileOffset, uint64 dataLength,
uint64 rowCount, uint64 chunkCount);
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot); static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
static StripeMetadata * BuildStripeMetadata(Relation columnarStripes, static StripeMetadata * BuildStripeMetadata(Relation columnarStripes,
HeapTuple heapTuple); HeapTuple heapTuple);
@ -183,6 +185,8 @@ typedef FormData_columnar_options *Form_columnar_options;
#define Anum_columnar_stripe_chunk_count 8 #define Anum_columnar_stripe_chunk_count 8
#define Anum_columnar_stripe_first_row_number 9 #define Anum_columnar_stripe_first_row_number 9
static int GetFirstRowNumberAttrIndexInColumnarStripe(TupleDesc tupleDesc);
/* constants for columnar.chunk_group */ /* constants for columnar.chunk_group */
#define Natts_columnar_chunkgroup 4 #define Natts_columnar_chunkgroup 4
#define Anum_columnar_chunkgroup_storageid 1 #define Anum_columnar_chunkgroup_storageid 1
@ -942,10 +946,12 @@ StripeMetadataLookupRowNumber(Relation relation, uint64 rowNumber, Snapshot snap
strategyNumber = BTGreaterStrategyNumber; strategyNumber = BTGreaterStrategyNumber;
procedure = F_INT8GT; procedure = F_INT8GT;
} }
ScanKeyInit(&scanKey[1], Anum_columnar_stripe_first_row_number,
strategyNumber, procedure, Int64GetDatum(rowNumber));
Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock); Relation columnarStripes = table_open(ColumnarStripeRelationId(), AccessShareLock);
TupleDesc tupleDesc = RelationGetDescr(columnarStripes);
ScanKeyInit(&scanKey[1], GetFirstRowNumberAttrIndexInColumnarStripe(tupleDesc) + 1,
strategyNumber, procedure, Int64GetDatum(rowNumber));
Oid indexId = ColumnarStripeFirstRowNumberIndexRelationId(); Oid indexId = ColumnarStripeFirstRowNumberIndexRelationId();
bool indexOk = OidIsValid(indexId); bool indexOk = OidIsValid(indexId);
@ -1210,9 +1216,13 @@ static void
InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCount, InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCount,
uint32 chunkGroupRowCount, uint64 firstRowNumber) uint32 chunkGroupRowCount, uint64 firstRowNumber)
{ {
bool nulls[Natts_columnar_stripe] = { false }; Oid columnarStripesOid = ColumnarStripeRelationId();
Relation columnarStripes = table_open(columnarStripesOid, RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *nulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
Datum values[Natts_columnar_stripe] = { 0 };
values[Anum_columnar_stripe_storageid - 1] = values[Anum_columnar_stripe_storageid - 1] =
UInt64GetDatum(storageId); UInt64GetDatum(storageId);
values[Anum_columnar_stripe_stripe - 1] = values[Anum_columnar_stripe_stripe - 1] =
@ -1221,7 +1231,7 @@ InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCou
UInt32GetDatum(columnCount); UInt32GetDatum(columnCount);
values[Anum_columnar_stripe_chunk_row_count - 1] = values[Anum_columnar_stripe_chunk_row_count - 1] =
UInt32GetDatum(chunkGroupRowCount); UInt32GetDatum(chunkGroupRowCount);
values[Anum_columnar_stripe_first_row_number - 1] = values[GetFirstRowNumberAttrIndexInColumnarStripe(tupleDescriptor)] =
UInt64GetDatum(firstRowNumber); UInt64GetDatum(firstRowNumber);
/* stripe has no rows yet, so initialize rest of the columns accordingly */ /* stripe has no rows yet, so initialize rest of the columns accordingly */
@ -1234,9 +1244,6 @@ InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCou
values[Anum_columnar_stripe_chunk_count - 1] = values[Anum_columnar_stripe_chunk_count - 1] =
UInt32GetDatum(0); UInt32GetDatum(0);
Oid columnarStripesOid = ColumnarStripeRelationId();
Relation columnarStripes = table_open(columnarStripesOid, RowExclusiveLock);
ModifyState *modifyState = StartModifyRelation(columnarStripes); ModifyState *modifyState = StartModifyRelation(columnarStripes);
InsertTupleAndEnforceConstraints(modifyState, values, nulls); InsertTupleAndEnforceConstraints(modifyState, values, nulls);
@ -1244,6 +1251,9 @@ InsertEmptyStripeMetadataRow(uint64 storageId, uint64 stripeId, uint32 columnCou
FinishModifyRelation(modifyState); FinishModifyRelation(modifyState);
table_close(columnarStripes, RowExclusiveLock); table_close(columnarStripes, RowExclusiveLock);
pfree(values);
pfree(nulls);
} }
@ -1354,19 +1364,8 @@ CompleteStripeReservation(Relation rel, uint64 stripeId, uint64 sizeBytes,
uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes); uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes);
uint64 storageId = ColumnarStorageGetStorageId(rel, false); uint64 storageId = ColumnarStorageGetStorageId(rel, false);
bool update[Natts_columnar_stripe] = { false }; return UpdateStripeMetadataRow(storageId, stripeId, resLogicalStart,
update[Anum_columnar_stripe_file_offset - 1] = true; sizeBytes, rowCount, chunkCount);
update[Anum_columnar_stripe_data_length - 1] = true;
update[Anum_columnar_stripe_row_count - 1] = true;
update[Anum_columnar_stripe_chunk_count - 1] = true;
Datum newValues[Natts_columnar_stripe] = { 0 };
newValues[Anum_columnar_stripe_file_offset - 1] = Int64GetDatum(resLogicalStart);
newValues[Anum_columnar_stripe_data_length - 1] = Int64GetDatum(sizeBytes);
newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
return UpdateStripeMetadataRow(storageId, stripeId, update, newValues);
} }
@ -1377,8 +1376,8 @@ CompleteStripeReservation(Relation rel, uint64 stripeId, uint64 sizeBytes,
* of stripe metadata should be updated according to modifications done. * of stripe metadata should be updated according to modifications done.
*/ */
static StripeMetadata * static StripeMetadata *
UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update, UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, uint64 fileOffset,
Datum *newValues) uint64 dataLength, uint64 rowCount, uint64 chunkCount)
{ {
SnapshotData dirtySnapshot; SnapshotData dirtySnapshot;
InitDirtySnapshot(dirtySnapshot); InitDirtySnapshot(dirtySnapshot);
@ -1402,6 +1401,7 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
#endif #endif
Relation columnarStripes = table_open(columnarStripesOid, openLockMode); Relation columnarStripes = table_open(columnarStripesOid, openLockMode);
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
Oid indexId = ColumnarStripePKeyIndexRelationId(); Oid indexId = ColumnarStripePKeyIndexRelationId();
bool indexOk = OidIsValid(indexId); bool indexOk = OidIsValid(indexId);
@ -1424,8 +1424,20 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
storageId, stripeId))); storageId, stripeId)));
} }
bool newNulls[Natts_columnar_stripe] = { false }; Datum *newValues = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes); bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
bool *update = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
update[Anum_columnar_stripe_file_offset - 1] = true;
update[Anum_columnar_stripe_data_length - 1] = true;
update[Anum_columnar_stripe_row_count - 1] = true;
update[Anum_columnar_stripe_chunk_count - 1] = true;
newValues[Anum_columnar_stripe_file_offset - 1] = Int64GetDatum(fileOffset);
newValues[Anum_columnar_stripe_data_length - 1] = Int64GetDatum(dataLength);
newValues[Anum_columnar_stripe_row_count - 1] = UInt64GetDatum(rowCount);
newValues[Anum_columnar_stripe_chunk_count - 1] = Int32GetDatum(chunkCount);
HeapTuple modifiedTuple = heap_modify_tuple(oldTuple, HeapTuple modifiedTuple = heap_modify_tuple(oldTuple,
tupleDescriptor, tupleDescriptor,
newValues, newValues,
@ -1464,6 +1476,10 @@ UpdateStripeMetadataRow(uint64 storageId, uint64 stripeId, bool *update,
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(columnarStripes, openLockMode); table_close(columnarStripes, openLockMode);
pfree(newValues);
pfree(newNulls);
pfree(update);
/* return StripeMetadata object built from modified tuple */ /* return StripeMetadata object built from modified tuple */
return modifiedStripeMetadata; return modifiedStripeMetadata;
} }
@ -1527,10 +1543,12 @@ BuildStripeMetadata(Relation columnarStripes, HeapTuple heapTuple)
{ {
Assert(RelationGetRelid(columnarStripes) == ColumnarStripeRelationId()); Assert(RelationGetRelid(columnarStripes) == ColumnarStripeRelationId());
Datum datumArray[Natts_columnar_stripe]; TupleDesc tupleDescriptor = RelationGetDescr(columnarStripes);
bool isNullArray[Natts_columnar_stripe];
heap_deform_tuple(heapTuple, RelationGetDescr(columnarStripes), Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
datumArray, isNullArray); bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
StripeMetadata *stripeMetadata = palloc0(sizeof(StripeMetadata)); StripeMetadata *stripeMetadata = palloc0(sizeof(StripeMetadata));
stripeMetadata->id = DatumGetInt64(datumArray[Anum_columnar_stripe_stripe - 1]); stripeMetadata->id = DatumGetInt64(datumArray[Anum_columnar_stripe_stripe - 1]);
@ -1547,7 +1565,10 @@ BuildStripeMetadata(Relation columnarStripes, HeapTuple heapTuple)
stripeMetadata->rowCount = DatumGetInt64( stripeMetadata->rowCount = DatumGetInt64(
datumArray[Anum_columnar_stripe_row_count - 1]); datumArray[Anum_columnar_stripe_row_count - 1]);
stripeMetadata->firstRowNumber = DatumGetUInt64( stripeMetadata->firstRowNumber = DatumGetUInt64(
datumArray[Anum_columnar_stripe_first_row_number - 1]); datumArray[GetFirstRowNumberAttrIndexInColumnarStripe(tupleDescriptor)]);
pfree(datumArray);
pfree(isNullArray);
/* /*
* If there is unflushed data in a parent transaction, then we would * If there is unflushed data in a parent transaction, then we would
@ -2095,3 +2116,23 @@ GetHighestUsedRowNumber(uint64 storageId)
return highestRowNumber; return highestRowNumber;
} }
/*
* GetFirstRowNumberAttrIndexInColumnarStripe returns attrnum for first_row_number attr.
*
* first_row_number attr was added to table columnar.stripe using alter operation after
* the version where Citus started supporting downgrades, and it's only column that we've
* introduced to columnar.stripe since then.
*
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
* Natts_columnar_stripe and when this happens, then we know that attrnum first_row_number is
* not Anum_columnar_stripe_first_row_number anymore but tupleDesc->natts - 1.
*/
static int
GetFirstRowNumberAttrIndexInColumnarStripe(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_columnar_stripe
? (Anum_columnar_stripe_first_row_number - 1)
: tupleDesc->natts - 1;
}