mirror of https://github.com/citusdata/citus.git
Fix downgrade and following upgrade
parent
a7e686c106
commit
c9d0ae2d5d
|
@ -346,12 +346,12 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
if (isNullArray[Anum_pg_dist_partition_partmethod - 1] ||
|
if (isNullArray[Anum_pg_dist_partition_partmethod - 1] ||
|
||||||
|
@ -363,6 +363,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
|
||||||
*/
|
*/
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,6 +376,8 @@ CdcIsReferenceTableViaCatalog(Oid relationId)
|
||||||
|
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A table is a reference table when its partition method is 'none'
|
* A table is a reference table when its partition method is 'none'
|
||||||
|
|
|
@ -729,12 +729,13 @@ PartitionMethodViaCatalog(Oid relationId)
|
||||||
return DISTRIBUTE_BY_INVALID;
|
return DISTRIBUTE_BY_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
|
||||||
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
if (isNullArray[Anum_pg_dist_partition_partmethod - 1])
|
if (isNullArray[Anum_pg_dist_partition_partmethod - 1])
|
||||||
|
@ -742,6 +743,8 @@ PartitionMethodViaCatalog(Oid relationId)
|
||||||
/* partition method cannot be NULL, still let's make sure */
|
/* partition method cannot be NULL, still let's make sure */
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
return DISTRIBUTE_BY_INVALID;
|
return DISTRIBUTE_BY_INVALID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -750,6 +753,8 @@ PartitionMethodViaCatalog(Oid relationId)
|
||||||
|
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
|
|
||||||
return partitionMethodChar;
|
return partitionMethodChar;
|
||||||
}
|
}
|
||||||
|
@ -768,12 +773,12 @@ PartitionColumnViaCatalog(Oid relationId)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
if (isNullArray[Anum_pg_dist_partition_partkey - 1])
|
if (isNullArray[Anum_pg_dist_partition_partkey - 1])
|
||||||
|
@ -781,6 +786,8 @@ PartitionColumnViaCatalog(Oid relationId)
|
||||||
/* partition key cannot be NULL, still let's make sure */
|
/* partition key cannot be NULL, still let's make sure */
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -795,6 +802,8 @@ PartitionColumnViaCatalog(Oid relationId)
|
||||||
|
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
|
|
||||||
return partitionColumn;
|
return partitionColumn;
|
||||||
}
|
}
|
||||||
|
@ -813,12 +822,13 @@ ColocationIdViaCatalog(Oid relationId)
|
||||||
return INVALID_COLOCATION_ID;
|
return INVALID_COLOCATION_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
|
||||||
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
if (isNullArray[Anum_pg_dist_partition_colocationid - 1])
|
if (isNullArray[Anum_pg_dist_partition_colocationid - 1])
|
||||||
|
@ -826,6 +836,8 @@ ColocationIdViaCatalog(Oid relationId)
|
||||||
/* colocation id cannot be NULL, still let's make sure */
|
/* colocation id cannot be NULL, still let's make sure */
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
return INVALID_COLOCATION_ID;
|
return INVALID_COLOCATION_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -834,6 +846,8 @@ ColocationIdViaCatalog(Oid relationId)
|
||||||
|
|
||||||
heap_freetuple(partitionTuple);
|
heap_freetuple(partitionTuple);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
|
|
||||||
return colocationId;
|
return colocationId;
|
||||||
}
|
}
|
||||||
|
@ -1741,10 +1755,11 @@ BuildCitusTableCacheEntry(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry =
|
CitusTableCacheEntry *cacheEntry =
|
||||||
|
@ -1797,7 +1812,7 @@ BuildCitusTableCacheEntry(Oid relationId)
|
||||||
cacheEntry->replicationModel = DatumGetChar(replicationModelDatum);
|
cacheEntry->replicationModel = DatumGetChar(replicationModelDatum);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isNullArray[Anum_pg_dist_partition_autoconverted - 1])
|
if (isNullArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)])
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We don't expect this to happen, but set it to false (the default value)
|
* We don't expect this to happen, but set it to false (the default value)
|
||||||
|
@ -1808,7 +1823,7 @@ BuildCitusTableCacheEntry(Oid relationId)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
cacheEntry->autoConverted = DatumGetBool(
|
cacheEntry->autoConverted = DatumGetBool(
|
||||||
datumArray[Anum_pg_dist_partition_autoconverted - 1]);
|
datumArray[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)]);
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_freetuple(distPartitionTuple);
|
heap_freetuple(distPartitionTuple);
|
||||||
|
@ -1852,6 +1867,9 @@ BuildCitusTableCacheEntry(Oid relationId)
|
||||||
|
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
|
|
||||||
cacheEntry->isValid = true;
|
cacheEntry->isValid = true;
|
||||||
|
|
||||||
return cacheEntry;
|
return cacheEntry;
|
||||||
|
@ -5013,8 +5031,8 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
||||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||||
while (HeapTupleIsValid(heapTuple))
|
while (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
|
Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
|
||||||
|
@ -5036,6 +5054,8 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
||||||
}
|
}
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
|
|
|
@ -573,10 +573,14 @@ FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc
|
||||||
{
|
{
|
||||||
Assert(heapTuple->t_tableOid == DistPartitionRelationId());
|
Assert(heapTuple->t_tableOid == DistPartitionRelationId());
|
||||||
|
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
Datum* datumArray = (Datum *) palloc0(tupleDesc->natts * sizeof(Datum));
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
bool* isNullArray = (bool *) palloc0(tupleDesc->natts * sizeof(bool));
|
||||||
|
|
||||||
heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray);
|
heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray);
|
||||||
|
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
|
|
||||||
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];
|
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];
|
||||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
Oid relationId = DatumGetObjectId(relationIdDatum);
|
||||||
|
|
||||||
|
|
|
@ -1919,23 +1919,21 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
{
|
{
|
||||||
char *distributionColumnString = NULL;
|
char *distributionColumnString = NULL;
|
||||||
|
|
||||||
Datum newValues[Natts_pg_dist_partition];
|
|
||||||
bool newNulls[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
/* open system catalog and insert new tuple */
|
/* open system catalog and insert new tuple */
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||||
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
|
||||||
|
Datum* newValues = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* newNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
/* form new tuple for pg_dist_partition */
|
/* form new tuple for pg_dist_partition */
|
||||||
memset(newValues, 0, sizeof(newValues));
|
|
||||||
memset(newNulls, false, sizeof(newNulls));
|
|
||||||
|
|
||||||
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
|
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
|
||||||
ObjectIdGetDatum(relationId);
|
ObjectIdGetDatum(relationId);
|
||||||
newValues[Anum_pg_dist_partition_partmethod - 1] =
|
newValues[Anum_pg_dist_partition_partmethod - 1] =
|
||||||
CharGetDatum(distributionMethod);
|
CharGetDatum(distributionMethod);
|
||||||
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||||
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
|
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
|
||||||
newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
|
newValues[GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor)] = BoolGetDatum(autoConverted);
|
||||||
|
|
||||||
/* set partkey column to NULL for reference tables */
|
/* set partkey column to NULL for reference tables */
|
||||||
if (distributionMethod != DISTRIBUTE_BY_NONE)
|
if (distributionMethod != DISTRIBUTE_BY_NONE)
|
||||||
|
@ -1951,7 +1949,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
newNulls[Anum_pg_dist_partition_partkey - 1] = true;
|
newNulls[Anum_pg_dist_partition_partkey - 1] = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
HeapTuple newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues,
|
HeapTuple newTuple = heap_form_tuple(tupleDescriptor, newValues,
|
||||||
newNulls);
|
newNulls);
|
||||||
|
|
||||||
/* finally insert tuple, build index entries & register cache invalidation */
|
/* finally insert tuple, build index entries & register cache invalidation */
|
||||||
|
@ -1963,6 +1961,9 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
|
||||||
|
pfree(newValues);
|
||||||
|
pfree(newNulls);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2154,13 +2155,13 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
int scanKeyCount = 1;
|
int scanKeyCount = 1;
|
||||||
bool indexOK = true;
|
bool indexOK = true;
|
||||||
Datum values[Natts_pg_dist_placement];
|
|
||||||
bool isnull[Natts_pg_dist_placement];
|
|
||||||
bool replace[Natts_pg_dist_placement];
|
|
||||||
bool colIsNull = false;
|
bool colIsNull = false;
|
||||||
|
|
||||||
Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock);
|
Relation pgDistPlacement = table_open(DistPlacementRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPlacement);
|
||||||
|
Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_placement_placementid,
|
||||||
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
|
BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId));
|
||||||
|
|
||||||
|
@ -2177,8 +2178,6 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
|
||||||
placementId)));
|
placementId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
|
||||||
|
|
||||||
values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId);
|
values[Anum_pg_dist_placement_groupid - 1] = Int32GetDatum(groupId);
|
||||||
isnull[Anum_pg_dist_placement_groupid - 1] = false;
|
isnull[Anum_pg_dist_placement_groupid - 1] = false;
|
||||||
replace[Anum_pg_dist_placement_groupid - 1] = true;
|
replace[Anum_pg_dist_placement_groupid - 1] = true;
|
||||||
|
@ -2197,6 +2196,10 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
table_close(pgDistPlacement, NoLock);
|
table_close(pgDistPlacement, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2210,12 +2213,14 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
int scanKeyCount = 1;
|
int scanKeyCount = 1;
|
||||||
bool indexOK = true;
|
bool indexOK = true;
|
||||||
Datum values[Natts_pg_dist_partition];
|
int autoconvertedindex;
|
||||||
bool isnull[Natts_pg_dist_partition];
|
|
||||||
bool replace[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId));
|
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(citusTableId));
|
||||||
|
|
||||||
|
@ -2231,11 +2236,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
|
||||||
citusTableId)));
|
citusTableId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
|
||||||
|
values[autoconvertedindex] = BoolGetDatum(autoConverted);
|
||||||
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
|
isnull[autoconvertedindex] = false;
|
||||||
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
|
replace[autoconvertedindex] = true;
|
||||||
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
|
|
||||||
|
|
||||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||||
|
|
||||||
|
@ -2247,6 +2251,10 @@ UpdatePgDistPartitionAutoConverted(Oid citusTableId, bool autoConverted)
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2286,12 +2294,14 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
int scanKeyCount = 1;
|
int scanKeyCount = 1;
|
||||||
bool indexOK = true;
|
bool indexOK = true;
|
||||||
Datum values[Natts_pg_dist_partition];
|
int autoconvertedindex;
|
||||||
bool isnull[Natts_pg_dist_partition];
|
|
||||||
bool replace[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
|
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
|
||||||
|
|
||||||
|
@ -2307,8 +2317,6 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
||||||
relationId)));
|
relationId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
|
||||||
|
|
||||||
replace[Anum_pg_dist_partition_partmethod - 1] = true;
|
replace[Anum_pg_dist_partition_partmethod - 1] = true;
|
||||||
values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod);
|
values[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod);
|
||||||
isnull[Anum_pg_dist_partition_partmethod - 1] = false;
|
isnull[Anum_pg_dist_partition_partmethod - 1] = false;
|
||||||
|
@ -2317,9 +2325,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
||||||
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||||
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
|
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
|
||||||
|
|
||||||
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
|
autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
|
||||||
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(false);
|
replace[autoconvertedindex] = true;
|
||||||
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
|
values[autoconvertedindex] = BoolGetDatum(false);
|
||||||
|
isnull[autoconvertedindex] = false;
|
||||||
|
|
||||||
char *distributionColumnString = nodeToString((Node *) distributionColumn);
|
char *distributionColumnString = nodeToString((Node *) distributionColumn);
|
||||||
|
|
||||||
|
@ -2337,6 +2346,10 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -2380,12 +2393,14 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
int scanKeyCount = 1;
|
int scanKeyCount = 1;
|
||||||
bool indexOK = true;
|
bool indexOK = true;
|
||||||
Datum values[Natts_pg_dist_partition];
|
int autoconvertedindex;
|
||||||
bool isnull[Natts_pg_dist_partition];
|
|
||||||
bool replace[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isnull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
|
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
|
||||||
|
|
||||||
|
@ -2401,8 +2416,6 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
||||||
relationId)));
|
relationId)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
|
||||||
|
|
||||||
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||||
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
|
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
|
||||||
replace[Anum_pg_dist_partition_colocationid - 1] = true;
|
replace[Anum_pg_dist_partition_colocationid - 1] = true;
|
||||||
|
@ -2411,9 +2424,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
||||||
isnull[Anum_pg_dist_partition_repmodel - 1] = false;
|
isnull[Anum_pg_dist_partition_repmodel - 1] = false;
|
||||||
replace[Anum_pg_dist_partition_repmodel - 1] = true;
|
replace[Anum_pg_dist_partition_repmodel - 1] = true;
|
||||||
|
|
||||||
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
|
autoconvertedindex = GetAutoConvertedAttrIndexInPgDistPartition(tupleDescriptor);
|
||||||
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
|
values[autoconvertedindex] = BoolGetDatum(autoConverted);
|
||||||
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
|
isnull[autoconvertedindex] = false;
|
||||||
|
replace[autoconvertedindex] = true;
|
||||||
|
|
||||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
|
||||||
|
|
||||||
|
@ -2424,6 +2438,10 @@ UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 coloca
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -4401,3 +4419,17 @@ UnblockDependingBackgroundTasks(BackgroundTask *task)
|
||||||
|
|
||||||
table_close(pgDistBackgroundTasksDepend, NoLock);
|
table_close(pgDistBackgroundTasksDepend, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AutoConverted attr was added to table pg_dist_partition using alter operation, so
|
||||||
|
* in case of downgrade we can get additional tuple field in tuple desc with dropped flag
|
||||||
|
* and in case of futher upgrade we can get number of fields > Natts_pg_dist_partition.
|
||||||
|
* So we should properly address AutoConverted attr in arrays.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc)
|
||||||
|
{
|
||||||
|
return TupleDescSize(tupleDEsc) == Natts_pg_dist_partition
|
||||||
|
? (Anum_pg_dist_partition_autoconverted - 1)
|
||||||
|
: tupleDEsc->natts - 1;
|
||||||
|
}
|
|
@ -815,13 +815,14 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
||||||
bool indexOK = true;
|
bool indexOK = true;
|
||||||
int scanKeyCount = 1;
|
int scanKeyCount = 1;
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
Datum values[Natts_pg_dist_partition];
|
|
||||||
bool isNull[Natts_pg_dist_partition];
|
|
||||||
bool replace[Natts_pg_dist_partition];
|
|
||||||
|
|
||||||
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
|
||||||
|
Datum* values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool* isNull = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
bool* replace = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
|
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
|
||||||
|
|
||||||
|
@ -838,10 +839,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
||||||
distributedRelationName)));
|
distributedRelationName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(values, 0, sizeof(values));
|
|
||||||
memset(isNull, false, sizeof(isNull));
|
|
||||||
memset(replace, false, sizeof(replace));
|
|
||||||
|
|
||||||
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||||
isNull[Anum_pg_dist_partition_colocationid - 1] = false;
|
isNull[Anum_pg_dist_partition_colocationid - 1] = false;
|
||||||
replace[Anum_pg_dist_partition_colocationid - 1] = true;
|
replace[Anum_pg_dist_partition_colocationid - 1] = true;
|
||||||
|
@ -858,6 +855,10 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId,
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
table_close(pgDistPartition, NoLock);
|
table_close(pgDistPartition, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isNull);
|
||||||
|
pfree(replace);
|
||||||
|
|
||||||
bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId);
|
bool shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId);
|
||||||
if (shouldSyncMetadata && !localOnly)
|
if (shouldSyncMetadata && !localOnly)
|
||||||
{
|
{
|
||||||
|
@ -1000,14 +1001,16 @@ ColocationGroupTableList(uint32 colocationId, uint32 count)
|
||||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||||
while (HeapTupleIsValid(heapTuple))
|
while (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
Oid colocatedTableId = DatumGetObjectId(
|
Oid colocatedTableId = DatumGetObjectId(
|
||||||
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
|
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
|
||||||
|
|
||||||
colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId);
|
colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId);
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
|
|
||||||
if (count == 0)
|
if (count == 0)
|
||||||
{
|
{
|
||||||
|
@ -1194,8 +1197,8 @@ ColocatedTableId(int32 colocationId)
|
||||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||||
while (HeapTupleIsValid(heapTuple))
|
while (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
bool isNullArray[Natts_pg_dist_partition];
|
Datum* datumArray = (Datum *) palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
Datum datumArray[Natts_pg_dist_partition];
|
bool* isNullArray = (bool *) palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
colocatedTableId = DatumGetObjectId(
|
colocatedTableId = DatumGetObjectId(
|
||||||
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
|
datumArray[Anum_pg_dist_partition_logicalrelid - 1]);
|
||||||
|
@ -1222,6 +1225,8 @@ ColocatedTableId(int32 colocationId)
|
||||||
colocatedTableId = InvalidOid;
|
colocatedTableId = InvalidOid;
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
pfree(datumArray);
|
||||||
|
pfree(isNullArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
|
|
|
@ -466,4 +466,5 @@ extern bool IsBackgroundJobStatusTerminal(BackgroundJobStatus status);
|
||||||
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
|
extern bool IsBackgroundTaskStatusTerminal(BackgroundTaskStatus status);
|
||||||
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
|
extern Oid BackgroundJobStatusOid(BackgroundJobStatus status);
|
||||||
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
|
extern Oid BackgroundTaskStatusOid(BackgroundTaskStatus status);
|
||||||
|
extern int GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDEsc);
|
||||||
#endif /* METADATA_UTILITY_H */
|
#endif /* METADATA_UTILITY_H */
|
||||||
|
|
Loading…
Reference in New Issue