diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 58c8fbb07..90b2ae6cd 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -115,6 +115,8 @@ static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool localOnly); +static int GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc); +static int GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc); static bool UnsetMetadataSyncedForAllWorkers(void); static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, @@ -1196,6 +1198,11 @@ ActivateNodeList(MetadataSyncContext *context) void ActivateCloneNodeAsPrimary(WorkerNode *workerNode) { + Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); + TupleDesc copiedTupleDescriptor = CreateTupleDescCopy(tupleDescriptor); + table_close(pgDistNode, AccessShareLock); + /* * Set the node as primary and active. */ @@ -1203,9 +1210,13 @@ ActivateCloneNodeAsPrimary(WorkerNode *workerNode) ObjectIdGetDatum(PrimaryNodeRoleId())); SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(true)); - SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisclone, + SetWorkerColumnLocalOnly(workerNode, + GetNodeIsCloneAttrIndexInPgDistNode(copiedTupleDescriptor) + + 1, BoolGetDatum(false)); - SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid, + SetWorkerColumnLocalOnly(workerNode, + GetNodePrimaryNodeIdAttrIndexInPgDistNode( + copiedTupleDescriptor) + 1, Int32GetDatum(0)); SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum(true)); @@ -1779,14 +1790,14 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca { const bool indexOK = true; - ScanKeyData scanKey[1]; - Datum values[Natts_pg_dist_node]; - bool isnull[Natts_pg_dist_node]; - bool replace[Natts_pg_dist_node]; - Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); + ScanKeyData scanKey[1]; + Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool)); + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId)); @@ -1801,8 +1812,6 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca newNodeName, newNodePort))); } - memset(replace, 0, sizeof(replace)); - values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort); isnull[Anum_pg_dist_node_nodeport - 1] = false; replace[Anum_pg_dist_node_nodeport - 1] = true; @@ -1835,6 +1844,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca systable_endscan(scanDescriptor); table_close(pgDistNode, NoLock); + + pfree(values); + pfree(isnull); + pfree(replace); } @@ -2105,11 +2118,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS) Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); - Datum values[Natts_pg_dist_node]; - bool isnull[Natts_pg_dist_node]; - bool replace[Natts_pg_dist_node]; + Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool)); - memset(replace, 0, sizeof(replace)); values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false); isnull[Anum_pg_dist_node_metadatasynced - 1] = false; replace[Anum_pg_dist_node_metadatasynced - 1] = true; @@ -2123,6 +2135,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS) table_close(pgDistNode, NoLock); + pfree(values); + pfree(isnull); + pfree(replace); + PG_RETURN_VOID(); } @@ -2831,9 +2847,9 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value) TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort); - Datum values[Natts_pg_dist_node]; - bool isnull[Natts_pg_dist_node]; - bool replace[Natts_pg_dist_node]; + Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool)); + bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool)); if (heapTuple == NULL) { @@ -2841,7 +2857,6 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value) workerNode->workerName, workerNode->workerPort))); } - memset(replace, 0, sizeof(replace)); values[columnIndex - 1] = value; isnull[columnIndex - 1] = false; replace[columnIndex - 1] = true; @@ -2857,6 +2872,10 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value) table_close(pgDistNode, NoLock); + pfree(values); + pfree(isnull); + pfree(replace); + return newWorkerNode; } @@ -3241,16 +3260,15 @@ InsertPlaceholderCoordinatorRecord(void) static void InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata) { - Datum values[Natts_pg_dist_node]; - bool isNulls[Natts_pg_dist_node]; + Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); + + Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isNulls = palloc0(tupleDescriptor->natts * sizeof(bool)); Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster); Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum); - /* form new shard tuple */ - memset(values, 0, sizeof(values)); - memset(isNulls, false, sizeof(isNulls)); - values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid); values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId); values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName); @@ -3264,14 +3282,10 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum; values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum( nodeMetadata->shouldHaveShards); - values[Anum_pg_dist_node_nodeisclone - 1] = BoolGetDatum( - nodeMetadata->nodeisclone); - values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum( - nodeMetadata->nodeprimarynodeid); - - Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock); - - TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); + values[GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor)] = + BoolGetDatum(nodeMetadata->nodeisclone); + values[GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor)] = + Int32GetDatum(nodeMetadata->nodeprimarynodeid); HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple); @@ -3283,6 +3297,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta /* close relation */ table_close(pgDistNode, NoLock); + + pfree(values); + pfree(isNulls); } @@ -3397,43 +3414,30 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap 1]); /* - * Attributes above this line are guaranteed to be present at the - * exact defined attribute number. Atleast till now. If you are droping or - * adding any of the above columns consider adjusting the code above + * nodecluster, nodeisclone and nodeprimarynodeid columns can be missing. In case + * of extension creation/upgrade, master_initialize_node_metadata function is + * called before the nodecluster column is added to pg_dist_node table. */ - Oid pgDistNodeRelId = RelationGetRelid(pgDistNode); - AttrNumber nodeClusterAttno = get_attnum(pgDistNodeRelId, "nodecluster"); - - if (nodeClusterAttno > 0 && - !TupleDescAttr(tupleDescriptor, nodeClusterAttno - 1)->attisdropped && - !isNullArray[nodeClusterAttno - 1]) + if (!isNullArray[Anum_pg_dist_node_nodecluster - 1]) { Name nodeClusterName = - DatumGetName(datumArray[nodeClusterAttno - 1]); + DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]); char *nodeClusterString = NameStr(*nodeClusterName); strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN); } - if (nAtts > Anum_pg_dist_node_nodeisclone) + int nodeIsCloneIdx = GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor); + int nodePrimaryNodeIdIdx = GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor); + + if (!isNullArray[nodeIsCloneIdx]) { - AttrNumber nodeIsCloneAttno = get_attnum(pgDistNodeRelId, "nodeisclone"); - if (nodeIsCloneAttno > 0 && - !TupleDescAttr(tupleDescriptor, nodeIsCloneAttno - 1)->attisdropped && - !isNullArray[nodeIsCloneAttno - 1]) - { - workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneAttno - 1]); - } - AttrNumber nodePrimaryNodeIdAttno = get_attnum(pgDistNodeRelId, - "nodeprimarynodeid"); - if (nodePrimaryNodeIdAttno > 0 && - !TupleDescAttr(tupleDescriptor, nodePrimaryNodeIdAttno - 1)->attisdropped && - !isNullArray[nodePrimaryNodeIdAttno - 1]) - { - workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[ - nodePrimaryNodeIdAttno - 1]) - ; - } + workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneIdx]); + } + + if (!isNullArray[nodePrimaryNodeIdIdx]) + { + workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[nodePrimaryNodeIdIdx]); } pfree(datumArray); @@ -3443,6 +3447,48 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap } +/* + * GetNodePrimaryNodeIdAttrIndexInPgDistNode returns attrnum for nodeprimarynodeid attr. + * + * nodeprimarynodeid attr was added to table pg_dist_node using alter operation + * after the version where Citus started supporting downgrades, and it's one of + * the two columns that we've introduced to pg_dist_node since then. + * + * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than + * Natts_pg_dist_node and when this happens, then we know that attrnum + * nodeprimarynodeid is not Anum_pg_dist_node_nodeprimarynodeid anymore but + * tupleDesc->natts - 1. + */ +static int +GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc) +{ + return tupleDesc->natts == Natts_pg_dist_node + ? (Anum_pg_dist_node_nodeprimarynodeid - 1) + : tupleDesc->natts - 1; +} + + +/* + * GetNodeIsCloneAttrIndexInPgDistNode returns attrnum for nodeisclone attr. + * + * Like, GetNodePrimaryNodeIdAttrIndexInPgDistNode(), performs a similar + * calculation for nodeisclone attribute because this is column added to + * pg_dist_node after we started supporting downgrades. + * + * Only difference with the mentioned function is that we know + * the attrnum for nodeisclone is not Anum_pg_dist_node_nodeisclone anymore + * but tupleDesc->natts - 2 because we added these columns consecutively + * and we first add nodeisclone attribute and then nodeprimarynodeid attribute. + */ +static int +GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc) +{ + return tupleDesc->natts == Natts_pg_dist_node + ? (Anum_pg_dist_node_nodeisclone - 1) + : tupleDesc->natts - 2; +} + + /* * StringToDatum transforms a string representation into a Datum. */ @@ -3519,15 +3565,15 @@ UnsetMetadataSyncedForAllWorkers(void) updatedAtLeastOne = true; } + Datum *values = palloc(tupleDescriptor->natts * sizeof(Datum)); + bool *isnull = palloc(tupleDescriptor->natts * sizeof(bool)); + bool *replace = palloc(tupleDescriptor->natts * sizeof(bool)); + while (HeapTupleIsValid(heapTuple)) { - Datum values[Natts_pg_dist_node]; - bool isnull[Natts_pg_dist_node]; - bool replace[Natts_pg_dist_node]; - - memset(replace, false, sizeof(replace)); - memset(isnull, false, sizeof(isnull)); - memset(values, 0, sizeof(values)); + memset(values, 0, tupleDescriptor->natts * sizeof(Datum)); + memset(isnull, 0, tupleDescriptor->natts * sizeof(bool)); + memset(replace, 0, tupleDescriptor->natts * sizeof(bool)); values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false); replace[Anum_pg_dist_node_metadatasynced - 1] = true; @@ -3550,6 +3596,10 @@ UnsetMetadataSyncedForAllWorkers(void) CatalogCloseIndexes(indstate); table_close(relation, NoLock); + pfree(values); + pfree(isnull); + pfree(replace); + return updatedAtLeastOne; }