Fix memory corruptions around pg_dist_background_task accessors after a Citus downgrade is followed by an upgrade (#8114)

DESCRIPTION: Fixes potential memory corruptions that could happen when
accessing pg_dist_background_task after a Citus downgrade is followed by
a Citus upgrade.

In case of Citus downgrade and further upgrade an undefined behavior may
be encountered. The reason is that Citus hardcoded the number of columns
in the extension's tables, but in case of downgrade and following update
some of these tables can have more columns, and some of them can be
marked as dropped.

This PR fixes all such tables using the approach introduced in #7950,
which solved the problem for the pg_dist_partition table.

See #7515 for a more thorough explanation.

---------

Co-authored-by: Karina Litskevich <litskevichkarina@gmail.com>
Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
pull/8106/head
Karina 2025-08-11 18:34:06 +03:00 committed by GitHub
parent 6b6d959fac
commit 71d6328378
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 86 additions and 31 deletions

View File

@ -3130,10 +3130,10 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
/* 2. insert new task */
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool nulls[Natts_pg_dist_background_task] = { 0 };
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTask);
memset(nulls, true, sizeof(nulls));
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *nulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
int64 taskId = GetNextBackgroundTaskTaskId();
@ -3164,15 +3164,17 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum("");
nulls[Anum_pg_dist_background_task_message - 1] = false;
values[Anum_pg_dist_background_task_nodes_involved - 1] =
IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[Anum_pg_dist_background_task_nodes_involved - 1] =
(nodesInvolvedCount == 0);
int nodesInvolvedIndex =
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor);
values[nodesInvolvedIndex] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
nulls[nodesInvolvedIndex] = (nodesInvolvedCount == 0);
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
values, nulls);
HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls);
CatalogTupleInsert(pgDistBackgroundTask, newTuple);
pfree(values);
pfree(nulls);
task = palloc0(sizeof(BackgroundTask));
task->taskid = taskId;
task->status = BACKGROUND_TASK_STATUS_RUNNABLE;
@ -3285,11 +3287,12 @@ ResetRunningBackgroundTasks(void)
List *taskIdsToWait = NIL;
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks);
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull);
values[Anum_pg_dist_background_task_status - 1] =
@ -3358,6 +3361,10 @@ ResetRunningBackgroundTasks(void)
replace);
CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple);
pfree(values);
pfree(isnull);
pfree(replace);
}
if (list_length(taskIdsToWait) > 0)
@ -3441,8 +3448,9 @@ DeformBackgroundJobHeapTuple(TupleDesc tupleDescriptor, HeapTuple jobTuple)
static BackgroundTask *
DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool nulls[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
BackgroundTask *task = palloc0(sizeof(BackgroundTask));
@ -3480,13 +3488,18 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple)
TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]);
}
if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1])
int nodesInvolvedIndex =
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor);
if (!nulls[nodesInvolvedIndex])
{
ArrayType *nodesInvolvedArrayObject =
DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]);
DatumGetArrayTypeP(values[nodesInvolvedIndex]);
task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject);
}
pfree(values);
pfree(nulls);
return task;
}
@ -3751,8 +3764,8 @@ JobTasksStatusCount(int64 jobId)
HeapTuple heapTuple = NULL;
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@ -3760,6 +3773,9 @@ JobTasksStatusCount(int64 jobId)
1]);
BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid);
pfree(values);
pfree(isnull);
switch (status)
{
case BACKGROUND_TASK_STATUS_BLOCKED:
@ -4012,9 +4028,9 @@ UpdateBackgroundJob(int64 jobId)
UINT64_FORMAT, jobId)));
}
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@ -4058,6 +4074,10 @@ UpdateBackgroundJob(int64 jobId)
systable_endscan(scanDescriptor);
table_close(pgDistBackgroundJobs, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -4093,9 +4113,9 @@ UpdateBackgroundTask(BackgroundTask *task)
task->jobid, task->taskid)));
}
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull);
@ -4164,6 +4184,10 @@ UpdateBackgroundTask(BackgroundTask *task)
systable_endscan(scanDescriptor);
table_close(pgDistBackgroundTasks, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -4251,9 +4275,10 @@ CancelTasksForJob(int64 jobid)
HeapTuple taskTuple = NULL;
while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor)))
{
Datum values[Natts_pg_dist_background_task] = { 0 };
bool nulls[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls);
Oid statusOid =
@ -4302,6 +4327,10 @@ CancelTasksForJob(int64 jobid)
taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, nulls,
replace);
CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple);
pfree(values);
pfree(nulls);
pfree(replace);
}
systable_endscan(scanDescriptor);
@ -4358,9 +4387,9 @@ UnscheduleDependentTasks(BackgroundTask *task)
"task_id: " UINT64_FORMAT, cTaskId)));
}
Datum values[Natts_pg_dist_background_task] = { 0 };
bool isnull[Natts_pg_dist_background_task] = { 0 };
bool replace[Natts_pg_dist_background_task] = { 0 };
Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
values[Anum_pg_dist_background_task_status - 1] =
ObjectIdGetDatum(CitusTaskStatusUnscheduledId());
@ -4372,6 +4401,10 @@ UnscheduleDependentTasks(BackgroundTask *task)
CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple);
systable_endscan(scanDescriptor);
pfree(values);
pfree(isnull);
pfree(replace);
}
}
@ -4457,3 +4490,23 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
? (Anum_pg_dist_partition_autoconverted - 1)
: tupleDesc->natts - 1;
}
/*
* GetNodesInvolvedAttrIndexInPgDistBackgroundTask returns attrnum for nodes_involved attr.
*
* nodes_involved attr was added to table pg_dist_background_task using alter operation after
* the version where Citus started supporting downgrades, and it's only column that we've
* introduced to pg_dist_background_task since then.
*
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
* Natts_pg_dist_background_task and when this happens, then we know that attrnum nodes_involved is
* not Anum_pg_dist_background_task_nodes_involved anymore but tupleDesc->natts - 1.
*/
int
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task
? (Anum_pg_dist_background_task_nodes_involved - 1)
: tupleDesc->natts - 1;
}

View File

@ -27,4 +27,6 @@
#define Anum_pg_dist_background_task_message 9
#define Anum_pg_dist_background_task_nodes_involved 10
extern int GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc);
#endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */