remove duplicate code in citus_dist_stat_activity (#3165)

pull/3157/head
SaitTalhaNisanci 2019-11-08 15:41:32 +03:00 committed by GitHub
parent 0b3d4e55d9
commit 02b359623f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 59 additions and 90 deletions

View File

@ -230,6 +230,8 @@ typedef struct CitusDistStat
static List * CitusStatActivity(const char *statQuery);
static void ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo);
static CitusDistStat * ParseCitusDistStat(PGresult *result, int64 rowIndex);
static void ReplaceInitiatorNodeIdentifier(int initiator_node_identifier,
CitusDistStat *citusDistStat);
/* utility functions to parse the fields from PGResult */
static text * ParseTextField(PGresult *result, int rowIndex, int colIndex);
@ -487,55 +489,12 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex)
{
CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat));
int initiator_node_identifier = 0;
WorkerNode *initiatorWorkerNode = NULL;
/*
* Replace initiator_node_identifier with initiator_node_hostname
* and initiator_node_port given that those are a lot more useful.
*
* The rules are following:
* - If initiator_node_identifier belongs to a worker, simply get it
* from the metadata
* - If the initiator_node_identifier belongs to the coordinator and
* we're executing the function on the coordinator, get the localhost
* and port
* - If the initiator_node_identifier belongs to the coordinator and
* we're executing the function on a worker node, manually mark it
* as "coordinator_host" given that we cannot know the host and port
* - If the initiator_node_identifier doesn't equal to zero, we know that
* it is a worker query initiated outside of a distributed
* transaction. However, we cannot know which node has initiated
* the worker query.
*/
initiator_node_identifier =
PQgetisnull(result, rowIndex, 0) ? -1 : ParseIntField(result, rowIndex, 0);
if (initiator_node_identifier > 0)
{
bool nodeExists = false;
initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists);
/* a query should run on an existing node */
Assert(nodeExists);
citusDistStat->master_query_host_name =
cstring_to_text(initiatorWorkerNode->workerName);
citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort;
}
else if (initiator_node_identifier == 0 && IsCoordinator())
{
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = PostPortNumber;
}
else if (initiator_node_identifier == 0)
{
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = 0;
}
else
{
citusDistStat->master_query_host_name = NULL;
citusDistStat->master_query_host_port = 0;
}
ReplaceInitiatorNodeIdentifier(initiator_node_identifier, citusDistStat);
citusDistStat->distributed_transaction_number = ParseIntField(result, rowIndex, 1);
citusDistStat->distributed_transaction_stamp =
@ -567,6 +526,60 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex)
}
static void
ReplaceInitiatorNodeIdentifier(int initiator_node_identifier,
CitusDistStat *citusDistStat)
{
WorkerNode *initiatorWorkerNode = NULL;
/*
* Replace initiator_node_identifier with initiator_node_hostname
* and initiator_node_port given that those are a lot more useful.
*
* The rules are following:
* - If initiator_node_identifier belongs to a worker, simply get it
* from the metadata
* - If the initiator_node_identifier belongs to the coordinator and
* we're executing the function on the coordinator, get the localhost
* and port
* - If the initiator_node_identifier belongs to the coordinator and
* we're executing the function on a worker node, manually mark it
* as "coordinator_host" given that we cannot know the host and port
* - If the initiator_node_identifier doesn't equal to zero, we know that
* it is a worker query initiated outside of a distributed
* transaction. However, we cannot know which node has initiated
* the worker query.
*/
if (initiator_node_identifier > 0)
{
bool nodeExists = false;
initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists);
/* a query should run on an existing node */
Assert(nodeExists);
citusDistStat->master_query_host_name =
cstring_to_text(initiatorWorkerNode->workerName);
citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort;
}
else if (initiator_node_identifier == 0 && IsCoordinator())
{
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = PostPortNumber;
}
else if (initiator_node_identifier == 0)
{
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = 0;
}
else
{
citusDistStat->master_query_host_name = NULL;
citusDistStat->master_query_host_port = 0;
}
}
/*
* LocalNodeCitusDistStat simply executes the given query via SPI and parses
* the results back in a list for further processing.
@ -658,54 +671,10 @@ HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor)
{
CitusDistStat *citusDistStat = (CitusDistStat *) palloc0(sizeof(CitusDistStat));
int initiator_node_identifier = 0;
WorkerNode *initiatorWorkerNode = NULL;
/*
* Replace initiator_node_identifier with initiator_node_hostname
* and initiator_node_port given that those are a lot more useful.
*
* The rules are following:
* - If initiator_node_identifier belongs to a worker, simply get it
* from the metadata
* - If the initiator_node_identifier belongs to the coordinator and
* we're executing the function on the coordinator, get the localhost
* and port
* - If the initiator_node_identifier belongs to the coordinator and
* we're executing the function on a worker node, manually mark it
* as "coordinator_host" given that we cannot know the host and port
* - If the initiator_node_identifier doesn't equal to zero, we know that
* it is a worker query initiated outside of a distributed
* transaction. However, we cannot know which node has initiated
* the worker query.
*/
initiator_node_identifier = ParseIntFieldFromHeapTuple(result, rowDescriptor, 1);
if (initiator_node_identifier > 0)
{
bool nodeExists = false;
initiatorWorkerNode = PrimaryNodeForGroup(initiator_node_identifier, &nodeExists);
/* a query should run on an existing node */
Assert(nodeExists);
citusDistStat->master_query_host_name =
cstring_to_text(initiatorWorkerNode->workerName);
citusDistStat->master_query_host_port = initiatorWorkerNode->workerPort;
}
else if (initiator_node_identifier == 0 && IsCoordinator())
{
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = PostPortNumber;
}
else if (initiator_node_identifier == 0)
{
citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name);
citusDistStat->master_query_host_port = 0;
}
else
{
citusDistStat->master_query_host_name = NULL;
citusDistStat->master_query_host_port = 0;
}
ReplaceInitiatorNodeIdentifier(initiator_node_identifier, citusDistStat);
citusDistStat->distributed_transaction_number =
ParseIntFieldFromHeapTuple(result, rowDescriptor, 2);