Port WorkerShardStats to new connection API

Part of the work in citusdata/citus#1101, this is a pretty direct port
over to the new functions and shouldn't result in any behavior changes.
pull/1139/head
Brian Cloutier 2017-01-12 20:28:19 +03:00 committed by Brian Cloutier
parent b1b2b4fadf
commit 67ee357d7f
2 changed files with 31 additions and 36 deletions

View File

@ -315,7 +315,7 @@ SendRemoteCommand(MultiConnection *connection, const char *command)
/*
* GetCommandResult is a wrapper around PQgetResult() that handles interrupts.
* GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts.
*
* If raiseInterrupts is true and an interrupt arrives, e.g. the query is
* being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws

View File

@ -17,6 +17,7 @@
#include "postgres.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "libpq-fe.h"
#include "access/htup_details.h"
#include "access/xact.h"
@ -24,6 +25,7 @@
#include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "distributed/colocation_utils.h"
#include "distributed/connection_management.h"
#include "distributed/multi_client_executor.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
@ -31,6 +33,8 @@
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
@ -637,9 +641,6 @@ static bool
WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName,
uint64 *shardSize, text **shardMinValue, text **shardMaxValue)
{
char *nodeName = placement->nodeName;
uint32 nodePort = placement->nodePort;
char *quotedShardName = NULL;
bool cstoreTable = false;
StringInfo tableSizeQuery = makeStringInfo();
@ -650,11 +651,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName,
char *partitionColumnName = NULL;
StringInfo partitionValueQuery = makeStringInfo();
int32 connectionId = -1;
bool queryOK = false;
void *queryResult = NULL;
int rowCount = 0;
int columnCount = 0;
PGresult *queryResult = NULL;
const int minValueIndex = 0;
const int maxValueIndex = 1;
@ -664,16 +661,16 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName,
bool minValueIsNull = false;
bool maxValueIsNull = false;
int connectionFlags = 0;
int executeCommand = 0;
MultiConnection *connection = GetPlacementConnection(connectionFlags, placement,
NULL);
*shardSize = 0;
*shardMinValue = NULL;
*shardMaxValue = NULL;
connectionId = MultiClientConnect(nodeName, nodePort, NULL, NULL);
if (connectionId == INVALID_CONNECTION_ID)
{
return false;
}
quotedShardName = quote_literal_cstr(shardName);
cstoreTable = CStoreTable(relationId);
@ -686,18 +683,18 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName,
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);
}
queryOK = MultiClientExecute(connectionId, tableSizeQuery->data,
&queryResult, &rowCount, &columnCount);
if (!queryOK)
executeCommand = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data,
&queryResult);
if (executeCommand != 0)
{
MultiClientDisconnect(connectionId);
return false;
}
tableSizeString = MultiClientGetValue(queryResult, 0, 0);
tableSizeString = PQgetvalue(queryResult, 0, 0);
if (tableSizeString == NULL)
{
MultiClientDisconnect(connectionId);
PQclear(queryResult);
ForgetResults(connection);
return false;
}
@ -705,20 +702,19 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName,
tableSize = strtoull(tableSizeString, &tableSizeStringEnd, 0);
if (errno != 0 || (*tableSizeStringEnd) != '\0')
{
MultiClientClearResult(queryResult);
MultiClientDisconnect(connectionId);
PQclear(queryResult);
ForgetResults(connection);
return false;
}
*shardSize = tableSize;
MultiClientClearResult(queryResult);
PQclear(queryResult);
ForgetResults(connection);
if (partitionType != DISTRIBUTE_BY_APPEND)
{
/* we don't need min/max for non-append distributed tables */
MultiClientDisconnect(connectionId);
return true;
}
@ -729,28 +725,27 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName,
appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY,
partitionColumnName, partitionColumnName, shardName);
queryOK = MultiClientExecute(connectionId, partitionValueQuery->data,
&queryResult, &rowCount, &columnCount);
if (!queryOK)
executeCommand = ExecuteOptionalRemoteCommand(connection, partitionValueQuery->data,
&queryResult);
if (executeCommand != 0)
{
MultiClientDisconnect(connectionId);
return false;
}
minValueIsNull = MultiClientValueIsNull(queryResult, 0, minValueIndex);
maxValueIsNull = MultiClientValueIsNull(queryResult, 0, maxValueIndex);
minValueIsNull = PQgetisnull(queryResult, 0, minValueIndex);
maxValueIsNull = PQgetisnull(queryResult, 0, maxValueIndex);
if (!minValueIsNull && !maxValueIsNull)
{
char *minValueResult = MultiClientGetValue(queryResult, 0, minValueIndex);
char *maxValueResult = MultiClientGetValue(queryResult, 0, maxValueIndex);
char *minValueResult = PQgetvalue(queryResult, 0, minValueIndex);
char *maxValueResult = PQgetvalue(queryResult, 0, maxValueIndex);
*shardMinValue = cstring_to_text(minValueResult);
*shardMaxValue = cstring_to_text(maxValueResult);
}
MultiClientClearResult(queryResult);
MultiClientDisconnect(connectionId);
PQclear(queryResult);
ForgetResults(connection);
return true;
}