From 02b359623f40fb251c866ff5a9d55a0232c93b34 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Fri, 8 Nov 2019 15:41:32 +0300 Subject: [PATCH] remove duplicate code in citus_dist_stat_activity (#3165) --- .../transaction/citus_dist_stat_activity.c | 149 +++++++----------- 1 file changed, 59 insertions(+), 90 deletions(-) diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 2901b7879..3226a97aa 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -230,6 +230,8 @@ typedef struct CitusDistStat static List * CitusStatActivity(const char *statQuery); static void ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo); static CitusDistStat * ParseCitusDistStat(PGresult *result, int64 rowIndex); +static void ReplaceInitiatorNodeIdentifier(int initiator_node_identifier, + CitusDistStat *citusDistStat); /* utility functions to parse the fields from PGResult */ static text * ParseTextField(PGresult *result, int rowIndex, int colIndex); @@ -487,55 +489,12 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex) { CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat)); int initiator_node_identifier = 0; - WorkerNode *initiatorWorkerNode = NULL; - /* - * Replace initiator_node_identifier with initiator_node_hostname - * and initiator_node_port given that those are a lot more useful. - * - * The rules are following: - * - If initiator_node_identifier belongs to a worker, simply get it - * from the metadata - * - If the initiator_node_identifier belongs to the coordinator and - * we're executing the function on the coordinator, get the localhost - * and port - * - If the initiator_node_identifier belongs to the coordinator and - * we're executing the function on a worker node, manually mark it - * as "coordinator_host" given that we cannot know the host and port - * - If the initiator_node_identifier doesn't equal to zero, we know that - * it is a worker query initiated outside of a distributed - * transaction. However, we cannot know which node has initiated - * the worker query. - */ + initiator_node_identifier = PQgetisnull(result, rowIndex, 0) ? -1 : ParseIntField(result, rowIndex, 0); - if (initiator_node_identifier > 0) - { - bool nodeExists = false; - initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists); - - /* a query should run on an existing node */ - Assert(nodeExists); - citusDistStat->master_query_host_name = - cstring_to_text(initiatorWorkerNode->workerName); - citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; - } - else if (initiator_node_identifier == 0 && IsCoordinator()) - { - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); - citusDistStat->master_query_host_port = PostPortNumber; - } - else if (initiator_node_identifier == 0) - { - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); - citusDistStat->master_query_host_port = 0; - } - else - { - citusDistStat->master_query_host_name = NULL; - citusDistStat->master_query_host_port = 0; - } + ReplaceInitiatorNodeIdentifier(initiator_node_identifier, citusDistStat); citusDistStat->distributed_transaction_number = ParseIntField(result, rowIndex, 1); citusDistStat->distributed_transaction_stamp = @@ -567,6 +526,60 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex) } +static void +ReplaceInitiatorNodeIdentifier(int initiator_node_identifier, + CitusDistStat *citusDistStat) +{ + WorkerNode *initiatorWorkerNode = NULL; + + /* + * Replace initiator_node_identifier with initiator_node_hostname + * and initiator_node_port given that those are a lot more useful. + * + * The rules are following: + * - If initiator_node_identifier belongs to a worker, simply get it + * from the metadata + * - If the initiator_node_identifier belongs to the coordinator and + * we're executing the function on the coordinator, get the localhost + * and port + * - If the initiator_node_identifier belongs to the coordinator and + * we're executing the function on a worker node, manually mark it + * as "coordinator_host" given that we cannot know the host and port + * - If the initiator_node_identifier doesn't equal to zero, we know that + * it is a worker query initiated outside of a distributed + * transaction. However, we cannot know which node has initiated + * the worker query. + */ + if (initiator_node_identifier > 0) + { + bool nodeExists = false; + + initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists); + + /* a query should run on an existing node */ + Assert(nodeExists); + citusDistStat->master_query_host_name = + cstring_to_text(initiatorWorkerNode->workerName); + citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; + } + else if (initiator_node_identifier == 0 && IsCoordinator()) + { + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = PostPortNumber; + } + else if (initiator_node_identifier == 0) + { + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = 0; + } + else + { + citusDistStat->master_query_host_name = NULL; + citusDistStat->master_query_host_port = 0; + } +} + + /* * LocalNodeCitusDistStat simply executes the given query via SPI and parses * the results back in a list for further processing. @@ -658,54 +671,10 @@ HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor) { CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat)); int initiator_node_identifier = 0; - WorkerNode *initiatorWorkerNode = NULL; - /* - * Replace initiator_node_identifier with initiator_node_hostname - * and initiator_node_port given that those are a lot more useful. - * - * The rules are following: - * - If initiator_node_identifier belongs to a worker, simply get it - * from the metadata - * - If the initiator_node_identifier belongs to the coordinator and - * we're executing the function on the coordinator, get the localhost - * and port - * - If the initiator_node_identifier belongs to the coordinator and - * we're executing the function on a worker node, manually mark it - * as "coordinator_host" given that we cannot know the host and port - * - If the initiator_node_identifier doesn't equal to zero, we know that - * it is a worker query initiated outside of a distributed - * transaction. However, we cannot know which node has initiated - * the worker query. - */ initiator_node_identifier = ParseIntFieldFromHeapTuple(result, rowDescriptor, 1); - if (initiator_node_identifier > 0) - { - bool nodeExists = false; - initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists); - - /* a query should run on an existing node */ - Assert(nodeExists); - citusDistStat->master_query_host_name = - cstring_to_text(initiatorWorkerNode->workerName); - citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort; - } - else if (initiator_node_identifier == 0 && IsCoordinator()) - { - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); - citusDistStat->master_query_host_port = PostPortNumber; - } - else if (initiator_node_identifier == 0) - { - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); - citusDistStat->master_query_host_port = 0; - } - else - { - citusDistStat->master_query_host_name = NULL; - citusDistStat->master_query_host_port = 0; - } + ReplaceInitiatorNodeIdentifier(initiator_node_identifier, citusDistStat); citusDistStat->distributed_transaction_number = ParseIntFieldFromHeapTuple(result, rowDescriptor, 2);