diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index 2b8bd0d1c..b84260f9e 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -3130,10 +3130,10 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC /* 2. insert new task */ { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool nulls[Natts_pg_dist_background_task] = { 0 }; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTask); - memset(nulls, true, sizeof(nulls)); + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *nulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); int64 taskId = GetNextBackgroundTaskTaskId(); @@ -3164,15 +3164,17 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC values[Anum_pg_dist_background_task_message - 1] = CStringGetTextDatum(""); nulls[Anum_pg_dist_background_task_message - 1] = false; - values[Anum_pg_dist_background_task_nodes_involved - 1] = - IntArrayToDatum(nodesInvolvedCount, nodesInvolved); - nulls[Anum_pg_dist_background_task_nodes_involved - 1] = - (nodesInvolvedCount == 0); + int nodesInvolvedIndex = + GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor); + values[nodesInvolvedIndex] = IntArrayToDatum(nodesInvolvedCount, nodesInvolved); + nulls[nodesInvolvedIndex] = (nodesInvolvedCount == 0); - HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask), - values, nulls); + HeapTuple newTuple = heap_form_tuple(tupleDescriptor, values, nulls); CatalogTupleInsert(pgDistBackgroundTask, newTuple); + pfree(values); + pfree(nulls); + task = palloc0(sizeof(BackgroundTask)); task->taskid = taskId; task->status = BACKGROUND_TASK_STATUS_RUNNABLE; @@ -3285,11 +3287,12 @@ ResetRunningBackgroundTasks(void) List *taskIdsToWait = NIL; while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; - TupleDesc tupleDescriptor = RelationGetDescr(pgDistBackgroundTasks); + + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(taskTuple, tupleDescriptor, values, isnull); values[Anum_pg_dist_background_task_status - 1] = @@ -3358,6 +3361,10 @@ ResetRunningBackgroundTasks(void) replace); CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple); + + pfree(values); + pfree(isnull); + pfree(replace); } if (list_length(taskIdsToWait) > 0) @@ -3441,8 +3448,9 @@ DeformBackgroundJobHeapTuple(TupleDesc tupleDescriptor, HeapTuple jobTuple) static BackgroundTask * DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool nulls[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); BackgroundTask *task = palloc0(sizeof(BackgroundTask)); @@ -3480,13 +3488,18 @@ DeformBackgroundTaskHeapTuple(TupleDesc tupleDescriptor, HeapTuple taskTuple) TextDatumGetCString(values[Anum_pg_dist_background_task_message - 1]); } - if (!nulls[Anum_pg_dist_background_task_nodes_involved - 1]) + int nodesInvolvedIndex = + GetNodesInvolvedAttrIndexInPgDistBackgroundTask(tupleDescriptor); + if (!nulls[nodesInvolvedIndex]) { ArrayType *nodesInvolvedArrayObject = - DatumGetArrayTypeP(values[Anum_pg_dist_background_task_nodes_involved - 1]); + DatumGetArrayTypeP(values[nodesInvolvedIndex]); task->nodesInvolved = IntegerArrayTypeToList(nodesInvolvedArrayObject); } + pfree(values); + pfree(nulls); + return task; } @@ -3751,8 +3764,8 @@ JobTasksStatusCount(int64 jobId) HeapTuple heapTuple = NULL; while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); @@ -3760,6 +3773,9 @@ JobTasksStatusCount(int64 jobId) 1]); BackgroundTaskStatus status = BackgroundTaskStatusByOid(statusOid); + pfree(values); + pfree(isnull); + switch (status) { case BACKGROUND_TASK_STATUS_BLOCKED: @@ -4012,9 +4028,9 @@ UpdateBackgroundJob(int64 jobId) UINT64_FORMAT, jobId))); } - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); @@ -4058,6 +4074,10 @@ UpdateBackgroundJob(int64 jobId) systable_endscan(scanDescriptor); table_close(pgDistBackgroundJobs, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -4093,9 +4113,9 @@ UpdateBackgroundTask(BackgroundTask *task) task->jobid, task->taskid))); } - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); heap_deform_tuple(heapTuple, tupleDescriptor, values, isnull); @@ -4164,6 +4184,10 @@ UpdateBackgroundTask(BackgroundTask *task) systable_endscan(scanDescriptor); table_close(pgDistBackgroundTasks, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -4251,9 +4275,10 @@ CancelTasksForJob(int64 jobid) HeapTuple taskTuple = NULL; while (HeapTupleIsValid(taskTuple = systable_getnext(scanDescriptor))) { - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool nulls[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *nulls = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + heap_deform_tuple(taskTuple, tupleDescriptor, values, nulls); Oid statusOid = @@ -4302,6 +4327,10 @@ CancelTasksForJob(int64 jobid) taskTuple = heap_modify_tuple(taskTuple, tupleDescriptor, values, nulls, replace); CatalogTupleUpdate(pgDistBackgroundTasks, &taskTuple->t_self, taskTuple); + + pfree(values); + pfree(nulls); + pfree(replace); } systable_endscan(scanDescriptor); @@ -4358,9 +4387,9 @@ UnscheduleDependentTasks(BackgroundTask *task) "task_id: " UINT64_FORMAT, cTaskId))); } - Datum values[Natts_pg_dist_background_task] = { 0 }; - bool isnull[Natts_pg_dist_background_task] = { 0 }; - bool replace[Natts_pg_dist_background_task] = { 0 }; + Datum *values = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = (bool *) palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); values[Anum_pg_dist_background_task_status - 1] = ObjectIdGetDatum(CitusTaskStatusUnscheduledId()); @@ -4372,6 +4401,10 @@ UnscheduleDependentTasks(BackgroundTask *task) CatalogTupleUpdate(pgDistBackgroundTasks, &heapTuple->t_self, heapTuple); systable_endscan(scanDescriptor); + + pfree(values); + pfree(isnull); + pfree(replace); } } @@ -4457,3 +4490,23 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc) ? (Anum_pg_dist_partition_autoconverted - 1) : tupleDesc->natts - 1; } + + +/* + * GetNodesInvolvedAttrIndexInPgDistBackgroundTask returns attrnum for nodes_involved attr. + * + * nodes_involved attr was added to table pg_dist_background_task using alter operation after + * the version where Citus started supporting downgrades, and it's only column that we've + * introduced to pg_dist_background_task since then. + * + * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than + * Natts_pg_dist_background_task and when this happens, then we know that attrnum nodes_involved is + * not Anum_pg_dist_background_task_nodes_involved anymore but tupleDesc->natts - 1. + */ +int +GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc) +{ + return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task + ? (Anum_pg_dist_background_task_nodes_involved - 1) + : tupleDesc->natts - 1; +} diff --git a/src/include/distributed/pg_dist_background_task.h b/src/include/distributed/pg_dist_background_task.h index 9e6673a64..60e30e638 100644 --- a/src/include/distributed/pg_dist_background_task.h +++ b/src/include/distributed/pg_dist_background_task.h @@ -27,4 +27,6 @@ #define Anum_pg_dist_background_task_message 9 #define Anum_pg_dist_background_task_nodes_involved 10 +extern int GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc); + #endif /* CITUS_PG_DIST_BACKGROUND_TASK_H */