Fix memory corruptions around pg_dist_object accessors after a Citus downgrade is followed by an upgrade (#8120)

DESCRIPTION: Fixes potential memory corruptions that could happen when
accessing pg_dist_object after a Citus downgrade is followed by a Citus
upgrade.

In case of Citus downgrade and further upgrade an undefined behavior may
be encountered. The reason is that Citus hardcoded the number of columns
in the extension's tables, but in case of downgrade and following update
some of these tables can have more columns, and some of them can be
marked as dropped.

This PR fixes all such tables using the approach introduced in #7950,
which solved the problem for the pg_dist_partition table.

See #7515 for a more thorough explanation.

---------

Co-authored-by: Karina Litskevich <litskevichkarina@gmail.com>
Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
pull/7983/head^2
Karina 2025-08-18 15:52:34 +03:00 committed by GitHub
parent e15cc5c63b
commit 2095679dc8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 53 additions and 22 deletions

View File

@ -769,13 +769,16 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
const bool indexOK = true; const bool indexOK = true;
ScanKeyData scanKey[3]; ScanKeyData scanKey[3];
Datum values[Natts_pg_dist_object];
bool isnull[Natts_pg_dist_object];
bool replace[Natts_pg_dist_object];
Relation pgDistObjectRel = table_open(DistObjectRelationId(), RowExclusiveLock); Relation pgDistObjectRel = table_open(DistObjectRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel); TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel);
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
int forseDelegationIndex = GetForceDelegationAttrIndexInPgDistObject(tupleDescriptor);
/* scan pg_dist_object for classid = $1 AND objid = $2 AND objsubid = $3 via index */ /* scan pg_dist_object for classid = $1 AND objid = $2 AND objsubid = $3 via index */
ScanKeyInit(&scanKey[0], Anum_pg_dist_object_classid, BTEqualStrategyNumber, F_OIDEQ, ScanKeyInit(&scanKey[0], Anum_pg_dist_object_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(distAddress->classId)); ObjectIdGetDatum(distAddress->classId));
@ -797,12 +800,7 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
distAddress->objectId, distAddress->objectSubId))); distAddress->objectId, distAddress->objectSubId)));
} }
memset(values, 0, sizeof(values));
memset(isnull, 0, sizeof(isnull));
memset(replace, 0, sizeof(replace));
replace[Anum_pg_dist_object_distribution_argument_index - 1] = true; replace[Anum_pg_dist_object_distribution_argument_index - 1] = true;
if (distribution_argument_index != NULL) if (distribution_argument_index != NULL)
{ {
values[Anum_pg_dist_object_distribution_argument_index - 1] = Int32GetDatum( values[Anum_pg_dist_object_distribution_argument_index - 1] = Int32GetDatum(
@ -825,16 +823,15 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
isnull[Anum_pg_dist_object_colocationid - 1] = true; isnull[Anum_pg_dist_object_colocationid - 1] = true;
} }
replace[Anum_pg_dist_object_force_delegation - 1] = true; replace[forseDelegationIndex] = true;
if (forceDelegation != NULL) if (forceDelegation != NULL)
{ {
values[Anum_pg_dist_object_force_delegation - 1] = BoolGetDatum( values[forseDelegationIndex] = BoolGetDatum(*forceDelegation);
*forceDelegation); isnull[forseDelegationIndex] = false;
isnull[Anum_pg_dist_object_force_delegation - 1] = false;
} }
else else
{ {
isnull[Anum_pg_dist_object_force_delegation - 1] = true; isnull[forseDelegationIndex] = true;
} }
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
@ -849,6 +846,10 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
table_close(pgDistObjectRel, NoLock); table_close(pgDistObjectRel, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
if (EnableMetadataSync) if (EnableMetadataSync)
{ {
List *objectAddressList = list_make1((ObjectAddress *) distAddress); List *objectAddressList = list_make1((ObjectAddress *) distAddress);

View File

@ -680,11 +680,9 @@ UpdateDistributedObjectColocationId(uint32 oldColocationId,
HeapTuple heapTuple; HeapTuple heapTuple;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{ {
Datum values[Natts_pg_dist_object]; Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool isnull[Natts_pg_dist_object]; bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool replace[Natts_pg_dist_object]; bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
memset(replace, 0, sizeof(replace));
replace[Anum_pg_dist_object_colocationid - 1] = true; replace[Anum_pg_dist_object_colocationid - 1] = true;
@ -698,6 +696,10 @@ UpdateDistributedObjectColocationId(uint32 oldColocationId,
CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple); CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple);
CitusInvalidateRelcacheByRelid(DistObjectRelationId()); CitusInvalidateRelcacheByRelid(DistObjectRelationId());
pfree(values);
pfree(isnull);
pfree(replace);
} }
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
@ -783,3 +785,23 @@ DistributedSequenceList(void)
relation_close(pgDistObjectRel, AccessShareLock); relation_close(pgDistObjectRel, AccessShareLock);
return distributedSequenceList; return distributedSequenceList;
} }
/*
* GetForceDelegationAttrIndexInPgDistObject returns attrnum for force_delegation attr.
*
* force_delegation attr was added to table pg_dist_object using alter operation after
* the version where Citus started supporting downgrades, and it's only column that we've
* introduced to pg_dist_object since then.
*
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
* Natts_pg_dist_object and when this happens, then we know that attrnum force_delegation is
* not Anum_pg_dist_object_force_delegation anymore but tupleDesc->natts - 1.
*/
int
GetForceDelegationAttrIndexInPgDistObject(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_object
? (Anum_pg_dist_object_force_delegation - 1)
: tupleDesc->natts - 1;
}

View File

@ -1730,8 +1730,11 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
if (HeapTupleIsValid(pgDistObjectTup)) if (HeapTupleIsValid(pgDistObjectTup))
{ {
Datum datumArray[Natts_pg_dist_object]; Datum *datumArray = palloc(pgDistObjectTupleDesc->natts * sizeof(Datum));
bool isNullArray[Natts_pg_dist_object]; bool *isNullArray = palloc(pgDistObjectTupleDesc->natts * sizeof(bool));
int forseDelegationIndex =
GetForceDelegationAttrIndexInPgDistObject(pgDistObjectTupleDesc);
heap_deform_tuple(pgDistObjectTup, pgDistObjectTupleDesc, datumArray, heap_deform_tuple(pgDistObjectTup, pgDistObjectTupleDesc, datumArray,
isNullArray); isNullArray);
@ -1746,7 +1749,10 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
DatumGetInt32(datumArray[Anum_pg_dist_object_colocationid - 1]); DatumGetInt32(datumArray[Anum_pg_dist_object_colocationid - 1]);
cacheEntry->forceDelegation = cacheEntry->forceDelegation =
DatumGetBool(datumArray[Anum_pg_dist_object_force_delegation - 1]); DatumGetBool(datumArray[forseDelegationIndex]);
pfree(datumArray);
pfree(isNullArray);
} }
else else
{ {

View File

@ -5235,7 +5235,7 @@ SendDistObjectCommands(MetadataSyncContext *context)
bool forceDelegationIsNull = false; bool forceDelegationIsNull = false;
Datum forceDelegationDatum = Datum forceDelegationDatum =
heap_getattr(nextTuple, heap_getattr(nextTuple,
Anum_pg_dist_object_force_delegation, GetForceDelegationAttrIndexInPgDistObject(tupleDesc) + 1,
tupleDesc, tupleDesc,
&forceDelegationIsNull); &forceDelegationIsNull);
bool forceDelegation = DatumGetBool(forceDelegationDatum); bool forceDelegation = DatumGetBool(forceDelegationDatum);

View File

@ -61,4 +61,6 @@ typedef FormData_pg_dist_object *Form_pg_dist_object;
#define Anum_pg_dist_object_colocationid 8 #define Anum_pg_dist_object_colocationid 8
#define Anum_pg_dist_object_force_delegation 9 #define Anum_pg_dist_object_force_delegation 9
extern int GetForceDelegationAttrIndexInPgDistObject(TupleDesc tupleDesc);
#endif /* PG_DIST_OBJECT_H */ #endif /* PG_DIST_OBJECT_H */