mirror of https://github.com/citusdata/citus.git
Use heap_deform_tuple() instead of calling heap_getattr(). (#2464)
After Fast ALTER TABLE ADD COLUMN with a non-NULL default in PG11, physical heaps might not contain all attributes after a ALTER TABLE ADD COLUMN happens. heap_getattr() returns NULL when the physical tuple doesn't contain an attribute. So we should use heap_deform_tuple() in these cases, which fills in the missing attributes. Our catalog tables evolve over time, and an upgrade might involve some ALTER TABLE ADD COLUMN commands. Note that we don't need to worry about postgres catalog tables and we can use heap_getattr() for them, because they only change between major versions. This also fixes #2453.pull/2470/head
parent
3616939dfa
commit
d3e284dcd6
|
@ -827,30 +827,32 @@ static GroupShardPlacement *
|
||||||
TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
TupleToGroupShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
||||||
{
|
{
|
||||||
GroupShardPlacement *shardPlacement = NULL;
|
GroupShardPlacement *shardPlacement = NULL;
|
||||||
bool isNull = false;
|
bool isNullArray[Natts_pg_dist_placement];
|
||||||
|
Datum datumArray[Natts_pg_dist_placement];
|
||||||
|
|
||||||
Datum placementId = heap_getattr(heapTuple, Anum_pg_dist_placement_placementid,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_placement_shardid,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_placement_shardlength,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum shardState = heap_getattr(heapTuple, Anum_pg_dist_placement_shardstate,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_placement_groupid,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement ||
|
if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_placement ||
|
||||||
HeapTupleHasNulls(heapTuple))
|
HeapTupleHasNulls(heapTuple))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple")));
|
ereport(ERROR, (errmsg("unexpected null in pg_dist_placement tuple")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We use heap_deform_tuple() instead of heap_getattr() to expand tuple
|
||||||
|
* to contain missing values when ALTER TABLE ADD COLUMN happens.
|
||||||
|
*/
|
||||||
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
shardPlacement = CitusMakeNode(GroupShardPlacement);
|
shardPlacement = CitusMakeNode(GroupShardPlacement);
|
||||||
shardPlacement->placementId = DatumGetInt64(placementId);
|
shardPlacement->placementId = DatumGetInt64(
|
||||||
shardPlacement->shardId = DatumGetInt64(shardId);
|
datumArray[Anum_pg_dist_placement_placementid - 1]);
|
||||||
shardPlacement->shardLength = DatumGetInt64(shardLength);
|
shardPlacement->shardId = DatumGetInt64(
|
||||||
shardPlacement->shardState = DatumGetUInt32(shardState);
|
datumArray[Anum_pg_dist_placement_shardid - 1]);
|
||||||
shardPlacement->groupId = DatumGetUInt32(groupId);
|
shardPlacement->shardLength = DatumGetInt64(
|
||||||
|
datumArray[Anum_pg_dist_placement_shardlength - 1]);
|
||||||
|
shardPlacement->shardState = DatumGetUInt32(
|
||||||
|
datumArray[Anum_pg_dist_placement_shardstate - 1]);
|
||||||
|
shardPlacement->groupId = DatumGetUInt32(
|
||||||
|
datumArray[Anum_pg_dist_placement_groupid - 1]);
|
||||||
|
|
||||||
return shardPlacement;
|
return shardPlacement;
|
||||||
}
|
}
|
||||||
|
|
|
@ -889,13 +889,13 @@ BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
||||||
{
|
{
|
||||||
HeapTuple distPartitionTuple = NULL;
|
HeapTuple distPartitionTuple = NULL;
|
||||||
Relation pgDistPartition = NULL;
|
Relation pgDistPartition = NULL;
|
||||||
Form_pg_dist_partition partitionForm = NULL;
|
|
||||||
Datum partitionKeyDatum = 0;
|
Datum partitionKeyDatum = 0;
|
||||||
Datum replicationModelDatum = 0;
|
Datum replicationModelDatum = 0;
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
bool isNull = false;
|
|
||||||
bool partitionKeyIsNull = false;
|
bool partitionKeyIsNull = false;
|
||||||
|
Datum datumArray[Natts_pg_dist_partition];
|
||||||
|
bool isNullArray[Natts_pg_dist_partition];
|
||||||
|
|
||||||
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
||||||
distPartitionTuple =
|
distPartitionTuple =
|
||||||
|
@ -912,14 +912,11 @@ BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
||||||
cacheEntry->isDistributedTable = true;
|
cacheEntry->isDistributedTable = true;
|
||||||
|
|
||||||
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
partitionForm = (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple);
|
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
cacheEntry->partitionMethod = partitionForm->partmethod;
|
cacheEntry->partitionMethod = datumArray[Anum_pg_dist_partition_partmethod - 1];
|
||||||
|
partitionKeyDatum = datumArray[Anum_pg_dist_partition_partkey - 1];
|
||||||
partitionKeyDatum = heap_getattr(distPartitionTuple,
|
partitionKeyIsNull = isNullArray[Anum_pg_dist_partition_partkey - 1];
|
||||||
Anum_pg_dist_partition_partkey,
|
|
||||||
tupleDescriptor,
|
|
||||||
&partitionKeyIsNull);
|
|
||||||
|
|
||||||
/* note that for reference tables partitionKeyisNull is true */
|
/* note that for reference tables partitionKeyisNull is true */
|
||||||
if (!partitionKeyIsNull)
|
if (!partitionKeyIsNull)
|
||||||
|
@ -944,20 +941,14 @@ BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
||||||
cacheEntry->partitionKeyString = NULL;
|
cacheEntry->partitionKeyString = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheEntry->colocationId = heap_getattr(distPartitionTuple,
|
cacheEntry->colocationId = datumArray[Anum_pg_dist_partition_colocationid - 1];
|
||||||
Anum_pg_dist_partition_colocationid,
|
if (isNullArray[Anum_pg_dist_partition_colocationid - 1])
|
||||||
tupleDescriptor,
|
|
||||||
&isNull);
|
|
||||||
if (isNull)
|
|
||||||
{
|
{
|
||||||
cacheEntry->colocationId = INVALID_COLOCATION_ID;
|
cacheEntry->colocationId = INVALID_COLOCATION_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
replicationModelDatum = heap_getattr(distPartitionTuple,
|
replicationModelDatum = datumArray[Anum_pg_dist_partition_repmodel - 1];
|
||||||
Anum_pg_dist_partition_repmodel,
|
if (isNullArray[Anum_pg_dist_partition_repmodel - 1])
|
||||||
tupleDescriptor,
|
|
||||||
&isNull);
|
|
||||||
if (isNull)
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column
|
* repmodel is NOT NULL but before ALTER EXTENSION citus UPGRADE the column
|
||||||
|
@ -3437,26 +3428,17 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva
|
||||||
int32 intervalTypeMod)
|
int32 intervalTypeMod)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
bool isNull = false;
|
|
||||||
bool minValueNull = false;
|
bool minValueNull = false;
|
||||||
bool maxValueNull = false;
|
bool maxValueNull = false;
|
||||||
Oid inputFunctionId = InvalidOid;
|
Oid inputFunctionId = InvalidOid;
|
||||||
Oid typeIoParam = InvalidOid;
|
Oid typeIoParam = InvalidOid;
|
||||||
Datum relationIdDatum = heap_getattr(heapTuple, Anum_pg_dist_shard_logicalrelid,
|
Datum datumArray[Natts_pg_dist_shard];
|
||||||
tupleDescriptor, &isNull);
|
bool isNullArray[Natts_pg_dist_shard];
|
||||||
Datum shardIdDatum = heap_getattr(heapTuple, Anum_pg_dist_shard_shardid,
|
Datum minValueTextDatum = 0;
|
||||||
tupleDescriptor, &isNull);
|
Datum maxValueTextDatum = 0;
|
||||||
Datum storageTypeDatum = heap_getattr(heapTuple, Anum_pg_dist_shard_shardstorage,
|
Oid relationId = InvalidOid;
|
||||||
tupleDescriptor, &isNull);
|
int64 shardId = InvalidOid;
|
||||||
|
char storageType = InvalidOid;
|
||||||
Datum minValueTextDatum = heap_getattr(heapTuple, Anum_pg_dist_shard_shardminvalue,
|
|
||||||
tupleDescriptor, &minValueNull);
|
|
||||||
Datum maxValueTextDatum = heap_getattr(heapTuple, Anum_pg_dist_shard_shardmaxvalue,
|
|
||||||
tupleDescriptor, &maxValueNull);
|
|
||||||
|
|
||||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
|
||||||
int64 shardId = DatumGetInt64(shardIdDatum);
|
|
||||||
char storageType = DatumGetChar(storageTypeDatum);
|
|
||||||
Datum minValue = 0;
|
Datum minValue = 0;
|
||||||
Datum maxValue = 0;
|
Datum maxValue = 0;
|
||||||
bool minValueExists = false;
|
bool minValueExists = false;
|
||||||
|
@ -3466,6 +3448,21 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva
|
||||||
char intervalAlign = '0';
|
char intervalAlign = '0';
|
||||||
char intervalDelim = '0';
|
char intervalDelim = '0';
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We use heap_deform_tuple() instead of heap_getattr() to expand tuple
|
||||||
|
* to contain missing values when ALTER TABLE ADD COLUMN happens.
|
||||||
|
*/
|
||||||
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
|
relationId = DatumGetObjectId(datumArray[Anum_pg_dist_shard_logicalrelid - 1]);
|
||||||
|
shardId = DatumGetInt64(datumArray[Anum_pg_dist_shard_shardid - 1]);
|
||||||
|
storageType = DatumGetChar(datumArray[Anum_pg_dist_shard_shardstorage - 1]);
|
||||||
|
minValueTextDatum = datumArray[Anum_pg_dist_shard_shardminvalue - 1];
|
||||||
|
maxValueTextDatum = datumArray[Anum_pg_dist_shard_shardmaxvalue - 1];
|
||||||
|
|
||||||
|
minValueNull = isNullArray[Anum_pg_dist_shard_shardminvalue - 1];
|
||||||
|
maxValueNull = isNullArray[Anum_pg_dist_shard_shardmaxvalue - 1];
|
||||||
|
|
||||||
if (!minValueNull && !maxValueNull)
|
if (!minValueNull && !maxValueNull)
|
||||||
{
|
{
|
||||||
char *minValueString = TextDatumGetCString(minValueTextDatum);
|
char *minValueString = TextDatumGetCString(minValueTextDatum);
|
||||||
|
|
|
@ -1523,41 +1523,35 @@ static WorkerNode *
|
||||||
TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
bool isNull = false;
|
Datum datumArray[Natts_pg_dist_node];
|
||||||
|
bool isNullArray[Natts_pg_dist_node];
|
||||||
Datum nodeId = heap_getattr(heapTuple, Anum_pg_dist_node_nodeid,
|
char *nodeName = NULL;
|
||||||
tupleDescriptor, &isNull);
|
char *nodeRack = NULL;
|
||||||
Datum groupId = heap_getattr(heapTuple, Anum_pg_dist_node_groupid,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum nodeName = heap_getattr(heapTuple, Anum_pg_dist_node_nodename,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_node_nodeport,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum nodeRack = heap_getattr(heapTuple, Anum_pg_dist_node_noderack,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum hasMetadata = heap_getattr(heapTuple, Anum_pg_dist_node_hasmetadata,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum isActive = heap_getattr(heapTuple, Anum_pg_dist_node_isactive,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum nodeRole = heap_getattr(heapTuple, Anum_pg_dist_node_noderole,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
Datum nodeCluster = heap_getattr(heapTuple, Anum_pg_dist_node_nodecluster,
|
|
||||||
tupleDescriptor, &isNull);
|
|
||||||
|
|
||||||
Assert(!HeapTupleHasNulls(heapTuple));
|
Assert(!HeapTupleHasNulls(heapTuple));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We use heap_deform_tuple() instead of heap_getattr() to expand tuple
|
||||||
|
* to contain missing values when ALTER TABLE ADD COLUMN happens.
|
||||||
|
*/
|
||||||
|
heap_deform_tuple(heapTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
||||||
|
nodeName = DatumGetCString(datumArray[Anum_pg_dist_node_nodename - 1]);
|
||||||
|
nodeRack = DatumGetCString(datumArray[Anum_pg_dist_node_noderack - 1]);
|
||||||
|
|
||||||
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
|
workerNode = (WorkerNode *) palloc0(sizeof(WorkerNode));
|
||||||
workerNode->nodeId = DatumGetUInt32(nodeId);
|
workerNode->nodeId = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeid - 1]);
|
||||||
workerNode->workerPort = DatumGetUInt32(nodePort);
|
workerNode->workerPort = DatumGetUInt32(datumArray[Anum_pg_dist_node_nodeport - 1]);
|
||||||
workerNode->groupId = DatumGetUInt32(groupId);
|
workerNode->groupId = DatumGetUInt32(datumArray[Anum_pg_dist_node_groupid - 1]);
|
||||||
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
|
strlcpy(workerNode->workerName, TextDatumGetCString(nodeName), WORKER_LENGTH);
|
||||||
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
|
strlcpy(workerNode->workerRack, TextDatumGetCString(nodeRack), WORKER_LENGTH);
|
||||||
workerNode->hasMetadata = DatumGetBool(hasMetadata);
|
workerNode->hasMetadata = DatumGetBool(datumArray[Anum_pg_dist_node_hasmetadata - 1]);
|
||||||
workerNode->isActive = DatumGetBool(isActive);
|
workerNode->isActive = DatumGetBool(datumArray[Anum_pg_dist_node_isactive - 1]);
|
||||||
workerNode->nodeRole = DatumGetObjectId(nodeRole);
|
workerNode->nodeRole = DatumGetObjectId(datumArray[Anum_pg_dist_node_noderole - 1]);
|
||||||
|
|
||||||
{
|
{
|
||||||
Name nodeClusterName = DatumGetName(nodeCluster);
|
Name nodeClusterName = DatumGetName(datumArray[Anum_pg_dist_node_nodecluster -
|
||||||
|
1]);
|
||||||
char *nodeClusterString = NameStr(*nodeClusterName);
|
char *nodeClusterString = NameStr(*nodeClusterName);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
-- if the output of following query changes, we might need to change
|
||||||
|
-- some heap_getattr() calls to heap_deform_tuple(). This errors out in
|
||||||
|
-- postgres versions before 11.
|
||||||
|
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
||||||
|
FROM pg_attribute
|
||||||
|
WHERE atthasmissing
|
||||||
|
ORDER BY attrelid, attname;
|
||||||
|
attrelid | attname | atthasmissing | attmissingval
|
||||||
|
--------------+-------------+---------------+---------------
|
||||||
|
pg_dist_node | hasmetadata | t | {f}
|
||||||
|
pg_dist_node | isactive | t | {t}
|
||||||
|
pg_dist_node | nodecluster | t | {default}
|
||||||
|
pg_dist_node | noderole | t | {primary}
|
||||||
|
(4 rows)
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
-- if the output of following query changes, we might need to change
|
||||||
|
-- some heap_getattr() calls to heap_deform_tuple(). This errors out in
|
||||||
|
-- postgres versions before 11.
|
||||||
|
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
||||||
|
FROM pg_attribute
|
||||||
|
WHERE atthasmissing
|
||||||
|
ORDER BY attrelid, attname;
|
||||||
|
ERROR: column "atthasmissing" does not exist
|
||||||
|
LINE 1: SELECT attrelid::regclass, attname, atthasmissing, attmissin...
|
||||||
|
^
|
|
@ -22,6 +22,7 @@ test: multi_test_helpers
|
||||||
test: multi_table_ddl
|
test: multi_table_ddl
|
||||||
test: multi_name_lengths
|
test: multi_name_lengths
|
||||||
test: multi_metadata_access
|
test: multi_metadata_access
|
||||||
|
test: multi_metadata_attributes
|
||||||
|
|
||||||
test: multi_read_from_secondaries
|
test: multi_read_from_secondaries
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
|
||||||
|
-- if the output of following query changes, we might need to change
|
||||||
|
-- some heap_getattr() calls to heap_deform_tuple(). This errors out in
|
||||||
|
-- postgres versions before 11.
|
||||||
|
SELECT attrelid::regclass, attname, atthasmissing, attmissingval
|
||||||
|
FROM pg_attribute
|
||||||
|
WHERE atthasmissing
|
||||||
|
ORDER BY attrelid, attname;
|
Loading…
Reference in New Issue