mirror of https://github.com/citusdata/citus.git
Fix memory corruptions that could happen when a Citus downgrade is followed by an upgrade (#7950)
DESCRIPTION: Fixes potential memory corruptions that could happen when a Citus downgrade is followed by a Citus upgrade. In case of citus downgrade and further upgrade citus crash with core dump. The reason is that citus hardcoded number of columns in pg_dist_partition table, but in case of downgrade and following update table can have more columns, and some of then can be marked as dropped. Patch suggest decision for this problem with using tupleDescriptor->nattrs(postgres internal approach). Fixes #7933. --------- Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>ihalatci-dependency-updates
parent
c183634207
commit
f0789bd388
|
|
@ -346,12 +346,12 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
|
|||
return false;
|
||||
}
|
||||
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
if (isNullArray[Anum_pg_dist_partition_partmethod - 1] ||
|
||||
|
|
@ -363,6 +363,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
|
|||
*/
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
@ -374,6 +376,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
|
|||
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
/*
|
||||
* A table is a reference table when its partition method is 'none'
|
||||
|
|
|
|||
|
|
@ -729,12 +729,13 @@ PartitionMethodViaCatalog(Oid relationId)
|
|||
return DISTRIBUTE_BY_INVALID;
|
||||
}
|
||||
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
if (isNullArray[Anum_pg_dist_partition_partmethod - 1])
|
||||
|
|
@ -742,6 +743,8 @@ PartitionMethodViaCatalog(Oid relationId)
|
|||
/* partition method cannot be NULL, still let's make sure */
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
return DISTRIBUTE_BY_INVALID;
|
||||
}
|
||||
|
||||
|
|
@ -750,6 +753,8 @@ PartitionMethodViaCatalog(Oid relationId)
|
|||
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
return partitionMethodChar;
|
||||
}
|
||||
|
|
@ -768,12 +773,12 @@ PartitionColumnViaCatalog(Oid relationId)
|
|||
return NULL;
|
||||
}
|
||||
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
if (isNullArray[Anum_pg_dist_partition_partkey - 1])
|
||||
|
|
@ -781,6 +786,8 @@ PartitionColumnViaCatalog(Oid relationId)
|
|||
/* partition key cannot be NULL, still let's make sure */
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
@ -795,6 +802,8 @@ PartitionColumnViaCatalog(Oid relationId)
|
|||
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
return partitionColumn;
|
||||
}
|
||||
|
|
@ -813,12 +822,13 @@ ColocationIdViaCatalog(Oid relationId)
|
|||
return INVALID_COLOCATION_ID;
|
||||
}
|
||||
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
if (isNullArray[Anum_pg_dist_partition_colocationid - 1])
|
||||
|
|
@ -826,6 +836,8 @@ ColocationIdViaCatalog(Oid relationId)
|
|||
/* colocation id cannot be NULL, still let's make sure */
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
return INVALID_COLOCATION_ID;
|
||||
}
|
||||
|
||||
|
|
@ -834,6 +846,8 @@ ColocationIdViaCatalog(Oid relationId)
|
|||
|
||||
heap_freetuple(partitionTuple);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
return colocationId;
|
||||
}
|
||||
|
|
@ -1741,10 +1755,11 @@ BuildCitusTableCacheEntry(Oid relationId)
|
|||
}
|
||||
|
||||
MemoryContext oldContext = NULL;
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
CitusTableCacheEntry *cacheEntry =
|
||||
|
|
@ -1797,7 +1812,7 @@ BuildCitusTableCacheEntry(Oid relationId)
|
|||
cacheEntry->replicationModel = DatumGetChar(replicationModelDatum);
|
||||
}
|
||||
|
||||
if (isNullArray[Anum_pg_dist_partition_autoconverted - 1])
|
||||
if (isNullArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)])
|
||||
{
|
||||
/*
|
||||
* We don't expect this to happen, but set it to false (the default value)
|
||||
|
|
@ -1808,7 +1823,7 @@ BuildCitusTableCacheEntry(Oid relationId)
|
|||
else
|
||||
{
|
||||
cacheEntry->autoConverted = DatumGetBool(
|
||||
datumArray[Anum_pg_dist_partition_autoconverted - 1]);
|
||||
datumArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]);
|
||||
}
|
||||
|
||||
heap_freetuple(distPartitionTuple);
|
||||
|
|
@ -1852,6 +1867,9 @@ BuildCitusTableCacheEntry(Oid relationId)
|
|||
|
||||
table_close(pgDistPartition, NoLock);
|
||||
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
cacheEntry->isValid = true;
|
||||
|
||||
return cacheEntry;
|
||||
|
|
@ -5011,10 +5029,13 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
|||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
|
||||
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
|
||||
Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
|
||||
|
|
@ -5038,6 +5059,9 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
|||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPartition, AccessShareLock);
|
||||
|
||||
|
|
|
|||
|
|
@ -573,13 +573,17 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc
|
|||
{
|
||||
Assert(heapTuple->t_tableOid == DistPartitionRelationId());
|
||||
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
Datum *datumArray = (Datum *) palloc(tupleDesc->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDesc->natts * sizeof(bool));
|
||||
|
||||
heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray);
|
||||
|
||||
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];
|
||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
||||
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
return relationId;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -812,6 +812,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
|||
{
|
||||
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
|
||||
}
|
||||
|
||||
/* for non-partitioned tables, we will use Postgres' size functions */
|
||||
else
|
||||
{
|
||||
|
|
@ -1919,23 +1920,22 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
|||
{
|
||||
char *distributionColumnString = NULL;
|
||||
|
||||
Datum newValues[Natts_pg_dist_partition];
|
||||
bool newNulls[Natts_pg_dist_partition];
|
||||
|
||||
/* open system catalog and insert new tuple */
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
|
||||
Datum *newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
/* form new tuple for pg_dist_partition */
|
||||
memset(newValues, 0, sizeof(newValues));
|
||||
memset(newNulls, false, sizeof(newNulls));
|
||||
|
||||
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
|
||||
ObjectIdGetDatum(relationId);
|
||||
newValues[Anum_pg_dist_partition_partmethod - 1] =
|
||||
CharGetDatum(distributionMethod);
|
||||
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
|
||||
newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
|
||||
newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] =
|
||||
BoolGetDatum(autoConverted);
|
||||
|
||||
/* set partkey column to NULL for reference tables */
|
||||
if (distributionMethod != DISTRIBUTE_BY_NONE)
|
||||
|
|
@ -1951,7 +1951,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
|||
newNulls[Anum_pg_dist_partition_partkey - 1] = true;
|
||||
}
|
||||
|
||||
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues,
|
||||
HeapTuple newTuple = heap_form_tuple(tupleDescriptor, newValues,
|
||||
newNulls);
|
||||
|
||||
/* finally insert tuple, build index entries & register cache invalidation */
|
||||
|
|
@ -1963,6 +1963,9 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
|||
|
||||
CommandCounterIncrement();
|
||||
table_close(pgDistPartition, NoLock);
|
||||
|
||||
pfree(newValues);
|
||||
pfree(newNulls);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -2154,13 +2157,13 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
|
|||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
bool indexOK = true;
|
||||
Datum values[Natts_pg_dist_placement];
|
||||
bool isnull[Natts_pg_dist_placement];
|
||||
bool replace[Natts_pg_dist_placement];
|
||||
bool colIsNull = false;
|
||||
|
||||
Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
|
||||
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid,
|
||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
|
||||
|
||||
|
|
@ -2177,8 +2180,6 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
|
|||
placementId)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId);
|
||||
isnull[Anum_pg_dist_placement_groupid - 1] = false;
|
||||
replace[Anum_pg_dist_placement_groupid - 1] = true;
|
||||
|
|
@ -2197,6 +2198,10 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
|
|||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPlacement, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -2210,12 +2215,13 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
|
|||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
bool indexOK = true;
|
||||
Datum values[Natts_pg_dist_partition];
|
||||
bool isnull[Natts_pg_dist_partition];
|
||||
bool replace[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId));
|
||||
|
||||
|
|
@ -2231,11 +2237,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
|
|||
citusTableId)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
|
||||
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
|
||||
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
|
||||
int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
|
||||
values[autoconvertedindex] = BoolGetDatum(autoConverted);
|
||||
isnull[autoconvertedindex] = false;
|
||||
replace[autoconvertedindex] = true;
|
||||
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||
|
||||
|
|
@ -2247,6 +2252,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
|
|||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -2286,12 +2295,13 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
|||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
bool indexOK = true;
|
||||
Datum values[Natts_pg_dist_partition];
|
||||
bool isnull[Natts_pg_dist_partition];
|
||||
bool replace[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
|
||||
|
||||
|
|
@ -2307,8 +2317,6 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
|||
relationId)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
replace[Anum_pg_dist_partition_partmethod - 1] = true;
|
||||
values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod);
|
||||
isnull[Anum_pg_dist_partition_partmethod - 1] = false;
|
||||
|
|
@ -2317,9 +2325,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
|||
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
|
||||
|
||||
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
|
||||
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(false);
|
||||
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
|
||||
int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
|
||||
replace[autoconvertedindex] = true;
|
||||
values[autoconvertedindex] = BoolGetDatum(false);
|
||||
isnull[autoconvertedindex] = false;
|
||||
|
||||
char *distributionColumnString = nodeToString((Node *) distributionColumn);
|
||||
|
||||
|
|
@ -2337,6 +2346,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
|||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -2380,12 +2393,13 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
|||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
bool indexOK = true;
|
||||
Datum values[Natts_pg_dist_partition];
|
||||
bool isnull[Natts_pg_dist_partition];
|
||||
bool replace[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
|
||||
|
||||
|
|
@ -2401,8 +2415,6 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
|||
relationId)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
|
||||
replace[Anum_pg_dist_partition_colocationid - 1] = true;
|
||||
|
|
@ -2411,9 +2423,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
|||
isnull[Anum_pg_dist_partition_repmodel - 1] = false;
|
||||
replace[Anum_pg_dist_partition_repmodel - 1] = true;
|
||||
|
||||
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
|
||||
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
|
||||
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
|
||||
int autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
|
||||
values[autoconvertedindex] = BoolGetDatum(autoConverted);
|
||||
isnull[autoconvertedindex] = false;
|
||||
replace[autoconvertedindex] = true;
|
||||
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||
|
||||
|
|
@ -2424,6 +2437,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
|||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -3149,8 +3166,8 @@ ScheduleBackgroundTask(int64 jobId, Oid owner, char *command, int dependingTaskC
|
|||
|
||||
values[Anum_pg_dist_background_task_nodes_involved - 1] =
|
||||
IntArrayToDatum(nodesInvolvedCount, nodesInvolved);
|
||||
nulls[Anum_pg_dist_background_task_nodes_involved - 1] = (nodesInvolvedCount ==
|
||||
0);
|
||||
nulls[Anum_pg_dist_background_task_nodes_involved - 1] =
|
||||
(nodesInvolvedCount == 0);
|
||||
|
||||
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistBackgroundTask),
|
||||
values, nulls);
|
||||
|
|
@ -4420,3 +4437,23 @@ UnblockDependingBackgroundTasks(BackgroundTask *task)
|
|||
|
||||
table_close(pgDistBackgroundTasksDepend, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAutoConvertedAttrIndexInPgDistPartition returns attrnum for autoconverted attr.
|
||||
*
|
||||
* autoconverted attr was added to table pg_dist_partition using alter operation after
|
||||
* the version where Citus started supporting downgrades, and it's only column that we've
|
||||
* introduced to pg_dist_partition since then.
|
||||
*
|
||||
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
|
||||
* Natts_pg_dist_partition and when this happens, then we know that attrnum autoconverted is
|
||||
* not Anum_pg_dist_partition_autoconverted anymore but tupleDesc->natts - 1.
|
||||
*/
|
||||
int
|
||||
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
|
||||
{
|
||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_partition
|
||||
? (Anum_pg_dist_partition_autoconverted - 1)
|
||||
: tupleDesc->natts - 1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -815,13 +815,14 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
|||
bool indexOK = true;
|
||||
int scanKeyCount = 1;
|
||||
ScanKeyData scanKey[1];
|
||||
Datum values[Natts_pg_dist_partition];
|
||||
bool isNull[Natts_pg_dist_partition];
|
||||
bool replace[Natts_pg_dist_partition];
|
||||
|
||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
|
||||
Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
|
||||
|
||||
|
|
@ -838,10 +839,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
|||
distributedRelationName)));
|
||||
}
|
||||
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(isNull, false, sizeof(isNull));
|
||||
memset(replace, false, sizeof(replace));
|
||||
|
||||
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||
isNull[Anum_pg_dist_partition_colocationid - 1] = false;
|
||||
replace[Anum_pg_dist_partition_colocationid - 1] = true;
|
||||
|
|
@ -858,6 +855,10 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
|||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPartition, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isNull);
|
||||
pfree(replace);
|
||||
|
||||
bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId);
|
||||
if (shouldSyncMetadata && !localOnly)
|
||||
{
|
||||
|
|
@ -998,10 +999,12 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
|
|||
indexOK, NULL, scanKeyCount, scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
|
||||
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
Oid colocatedTableId = DatumGetObjectId(
|
||||
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
|
||||
|
|
@ -1020,6 +1023,8 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
|
|||
break;
|
||||
}
|
||||
}
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPartition, AccessShareLock);
|
||||
|
|
@ -1192,10 +1197,12 @@ ColocatedTableId(int32 colocationId)
|
|||
indexOK, NULL, scanKeyCount, scanKey);
|
||||
|
||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||
Datum *datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
memset(datumArray, 0, tupleDescriptor->natts * sizeof(Datum));
|
||||
memset(isNullArray, 0, tupleDescriptor->natts * sizeof(bool));
|
||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||
colocatedTableId = DatumGetObjectId(
|
||||
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
|
||||
|
|
@ -1223,6 +1230,8 @@ ColocatedTableId(int32 colocationId)
|
|||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
pfree(datumArray);
|
||||
pfree(isNullArray);
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistPartition, AccessShareLock);
|
||||
|
|
|
|||
|
|
@ -466,4 +466,5 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status);
|
|||
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
|
||||
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
|
||||
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
|
||||
extern int GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc);
|
||||
#endif /* METADATA_UTILITY_H */
|
||||
|
|
|
|||
Loading…
Reference in New Issue