Fix memory corruptions around pg_dist_node accessors after a Citus downgrade is followed by an upgrade (#8144)

Unlike what has been fixed in #7950, #8120, #8124, #8121 and #8114, this
was not an issue in older releases but is a potential issue to be
introduced by the current (13.2) release because in one of recent
commits (#8122) two columns has been added to pg_dist_node. In other
words, none of the older releases since we started supporting downgrades
added new columns to pg_dist_node.

The mentioned PR actually attempted avoiding these kind of issues in one
of the code-paths but not in some others.

So, this PR, avoids memory corruptions around pg_dist_node accessors in
a standardized way (as implemented in other example PRs) and in all
code-paths.
pull/8146/head^2
Onur Tirtir 2025-08-22 14:07:44 +03:00 committed by GitHub
parent 86b5bc6a20
commit 785287c58f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 116 additions and 66 deletions

View File

@ -115,6 +115,8 @@ static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown); static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort,
bool localOnly); bool localOnly);
static int GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc);
static int GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc);
static bool UnsetMetadataSyncedForAllWorkers(void); static bool UnsetMetadataSyncedForAllWorkers(void);
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
int columnIndex, int columnIndex,
@ -1196,6 +1198,11 @@ ActivateNodeList(MetadataSyncContext *context)
void void
ActivateCloneNodeAsPrimary(WorkerNode *workerNode) ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
{ {
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
TupleDesc copiedTupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
table_close(pgDistNode, AccessShareLock);
/* /*
* Set the node as primary and active. * Set the node as primary and active.
*/ */
@ -1203,9 +1210,13 @@ ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
ObjectIdGetDatum(PrimaryNodeRoleId())); ObjectIdGetDatum(PrimaryNodeRoleId()));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true)); BoolGetDatum(true));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisclone, SetWorkerColumnLocalOnly(workerNode,
GetNodeIsCloneAttrIndexInPgDistNode(copiedTupleDescriptor) +
1,
BoolGetDatum(false)); BoolGetDatum(false));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid, SetWorkerColumnLocalOnly(workerNode,
GetNodePrimaryNodeIdAttrIndexInPgDistNode(
copiedTupleDescriptor) + 1,
Int32GetDatum(0)); Int32GetDatum(0));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata, SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true)); BoolGetDatum(true));
@ -1779,14 +1790,14 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
{ {
const bool indexOK = true; const bool indexOK = true;
ScanKeyData scanKey[1];
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
ScanKeyData scanKey[1];
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid, ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId)); BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
@ -1801,8 +1812,6 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
newNodeName, newNodePort))); newNodeName, newNodePort)));
} }
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort); values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
isnull[Anum_pg_dist_node_nodeport - 1] = false; isnull[Anum_pg_dist_node_nodeport - 1] = false;
replace[Anum_pg_dist_node_nodeport - 1] = true; replace[Anum_pg_dist_node_nodeport - 1] = true;
@ -1835,6 +1844,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgDistNode, NoLock); table_close(pgDistNode, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
} }
@ -2105,11 +2118,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock); Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
Datum values[Natts_pg_dist_node]; Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool isnull[Natts_pg_dist_node]; bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool replace[Natts_pg_dist_node]; bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false); values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false);
isnull[Anum_pg_dist_node_metadatasynced - 1] = false; isnull[Anum_pg_dist_node_metadatasynced - 1] = false;
replace[Anum_pg_dist_node_metadatasynced - 1] = true; replace[Anum_pg_dist_node_metadatasynced - 1] = true;
@ -2123,6 +2135,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
table_close(pgDistNode, NoLock); table_close(pgDistNode, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -2831,9 +2847,9 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort); HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
Datum values[Natts_pg_dist_node]; Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool isnull[Natts_pg_dist_node]; bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool replace[Natts_pg_dist_node]; bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
if (heapTuple == NULL) if (heapTuple == NULL)
{ {
@ -2841,7 +2857,6 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
workerNode->workerName, workerNode->workerPort))); workerNode->workerName, workerNode->workerPort)));
} }
memset(replace, 0, sizeof(replace));
values[columnIndex - 1] = value; values[columnIndex - 1] = value;
isnull[columnIndex - 1] = false; isnull[columnIndex - 1] = false;
replace[columnIndex - 1] = true; replace[columnIndex - 1] = true;
@ -2857,6 +2872,10 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
table_close(pgDistNode, NoLock); table_close(pgDistNode, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
return newWorkerNode; return newWorkerNode;
} }
@ -3241,16 +3260,15 @@ InsertPlaceholderCoordinatorRecord(void)
static void static void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata) InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
{ {
Datum values[Natts_pg_dist_node]; Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
bool isNulls[Natts_pg_dist_node]; TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isNulls = palloc0(tupleDescriptor->natts * sizeof(bool));
Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster); Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum); Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId); values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
@ -3264,14 +3282,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum( values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
nodeMetadata->shouldHaveShards); nodeMetadata->shouldHaveShards);
values[Anum_pg_dist_node_nodeisclone - 1] = BoolGetDatum( values[GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor)] =
nodeMetadata->nodeisclone); BoolGetDatum(nodeMetadata->nodeisclone);
values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum( values[GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor)] =
nodeMetadata->nodeprimarynodeid); Int32GetDatum(nodeMetadata->nodeprimarynodeid);
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple); CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple);
@ -3283,6 +3297,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
/* close relation */ /* close relation */
table_close(pgDistNode, NoLock); table_close(pgDistNode, NoLock);
pfree(values);
pfree(isNulls);
} }
@ -3397,43 +3414,30 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
1]); 1]);
/* /*
* Attributes above this line are guaranteed to be present at the * nodecluster, nodeisclone and nodeprimarynodeid columns can be missing. In case
* exact defined attribute number. Atleast till now. If you are droping or * of extension creation/upgrade, master_initialize_node_metadata function is
* adding any of the above columns consider adjusting the code above * called before the nodecluster column is added to pg_dist_node table.
*/ */
Oid pgDistNodeRelId = RelationGetRelid(pgDistNode);
AttrNumber nodeClusterAttno = get_attnum(pgDistNodeRelId, "nodecluster"); if (!isNullArray[Anum_pg_dist_node_nodecluster - 1])
if (nodeClusterAttno > 0 &&
!TupleDescAttr(tupleDescriptor, nodeClusterAttno - 1)->attisdropped &&
!isNullArray[nodeClusterAttno - 1])
{ {
Name nodeClusterName = Name nodeClusterName =
DatumGetName(datumArray[nodeClusterAttno - 1]); DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]);
char *nodeClusterString = NameStr(*nodeClusterName); char *nodeClusterString = NameStr(*nodeClusterName);
strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN); strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
} }
if (nAtts > Anum_pg_dist_node_nodeisclone) int nodeIsCloneIdx = GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor);
int nodePrimaryNodeIdIdx = GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor);
if (!isNullArray[nodeIsCloneIdx])
{ {
AttrNumber nodeIsCloneAttno = get_attnum(pgDistNodeRelId, "nodeisclone"); workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneIdx]);
if (nodeIsCloneAttno > 0 && }
!TupleDescAttr(tupleDescriptor, nodeIsCloneAttno - 1)->attisdropped &&
!isNullArray[nodeIsCloneAttno - 1]) if (!isNullArray[nodePrimaryNodeIdIdx])
{ {
workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneAttno - 1]); workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[nodePrimaryNodeIdIdx]);
}
AttrNumber nodePrimaryNodeIdAttno = get_attnum(pgDistNodeRelId,
"nodeprimarynodeid");
if (nodePrimaryNodeIdAttno > 0 &&
!TupleDescAttr(tupleDescriptor, nodePrimaryNodeIdAttno - 1)->attisdropped &&
!isNullArray[nodePrimaryNodeIdAttno - 1])
{
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[
nodePrimaryNodeIdAttno - 1])
;
}
} }
pfree(datumArray); pfree(datumArray);
@ -3443,6 +3447,48 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
} }
/*
* GetNodePrimaryNodeIdAttrIndexInPgDistNode returns attrnum for nodeprimarynodeid attr.
*
* nodeprimarynodeid attr was added to table pg_dist_node using alter operation
* after the version where Citus started supporting downgrades, and it's one of
* the two columns that we've introduced to pg_dist_node since then.
*
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
* Natts_pg_dist_node and when this happens, then we know that attrnum
* nodeprimarynodeid is not Anum_pg_dist_node_nodeprimarynodeid anymore but
* tupleDesc->natts - 1.
*/
static int
GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc)
{
return tupleDesc->natts == Natts_pg_dist_node
? (Anum_pg_dist_node_nodeprimarynodeid - 1)
: tupleDesc->natts - 1;
}
/*
* GetNodeIsCloneAttrIndexInPgDistNode returns attrnum for nodeisclone attr.
*
* Like, GetNodePrimaryNodeIdAttrIndexInPgDistNode(), performs a similar
* calculation for nodeisclone attribute because this is column added to
* pg_dist_node after we started supporting downgrades.
*
* Only difference with the mentioned function is that we know
* the attrnum for nodeisclone is not Anum_pg_dist_node_nodeisclone anymore
* but tupleDesc->natts - 2 because we added these columns consecutively
* and we first add nodeisclone attribute and then nodeprimarynodeid attribute.
*/
static int
GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc)
{
return tupleDesc->natts == Natts_pg_dist_node
? (Anum_pg_dist_node_nodeisclone - 1)
: tupleDesc->natts - 2;
}
/* /*
* StringToDatum transforms a string representation into a Datum. * StringToDatum transforms a string representation into a Datum.
*/ */
@ -3519,15 +3565,15 @@ UnsetMetadataSyncedForAllWorkers(void)
updatedAtLeastOne = true; updatedAtLeastOne = true;
} }
Datum *values = palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple)) while (HeapTupleIsValid(heapTuple))
{ {
Datum values[Natts_pg_dist_node]; memset(values, 0, tupleDescriptor->natts * sizeof(Datum));
bool isnull[Natts_pg_dist_node]; memset(isnull, 0, tupleDescriptor->natts * sizeof(bool));
bool replace[Natts_pg_dist_node]; memset(replace, 0, tupleDescriptor->natts * sizeof(bool));
memset(replace, false, sizeof(replace));
memset(isnull, false, sizeof(isnull));
memset(values, 0, sizeof(values));
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false); values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
replace[Anum_pg_dist_node_metadatasynced - 1] = true; replace[Anum_pg_dist_node_metadatasynced - 1] = true;
@ -3550,6 +3596,10 @@ UnsetMetadataSyncedForAllWorkers(void)
CatalogCloseIndexes(indstate); CatalogCloseIndexes(indstate);
table_close(relation, NoLock); table_close(relation, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
return updatedAtLeastOne; return updatedAtLeastOne;
} }