From 67ee357d7f68f6d8d4eb235ecd1762662d0f7a13 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Thu, 12 Jan 2017 20:28:19 +0300 Subject: [PATCH] Port WorkerShardStats to new connection API Part of the work in citusdata/citus#1101, this is a pretty direct port over to the new functions and shouldn't result in any behavior changes. --- .../distributed/connection/remote_commands.c | 2 +- .../master/master_stage_protocol.c | 65 +++++++++---------- 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 1b716e017..a99a24870 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -315,7 +315,7 @@ SendRemoteCommand(MultiConnection *connection, const char *command) /* - * GetCommandResult is a wrapper around PQgetResult() that handles interrupts. + * GetRemoteCommandResult is a wrapper around PQgetResult() that handles interrupts. * * If raiseInterrupts is true and an interrupt arrives, e.g. the query is * being cancelled, CHECK_FOR_INTERRUPTS() will be called, which then throws diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 3474e2bcb..e79c38d8e 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -17,6 +17,7 @@ #include "postgres.h" #include "funcapi.h" #include "miscadmin.h" +#include "libpq-fe.h" #include "access/htup_details.h" #include "access/xact.h" @@ -24,6 +25,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "distributed/colocation_utils.h" +#include "distributed/connection_management.h" #include "distributed/multi_client_executor.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" @@ -31,6 +33,8 @@ #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/placement_connection.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -637,9 +641,6 @@ static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue) { - char *nodeName = placement->nodeName; - uint32 nodePort = placement->nodePort; - char *quotedShardName = NULL; bool cstoreTable = false; StringInfo tableSizeQuery = makeStringInfo(); @@ -650,11 +651,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, char *partitionColumnName = NULL; StringInfo partitionValueQuery = makeStringInfo(); - int32 connectionId = -1; - bool queryOK = false; - void *queryResult = NULL; - int rowCount = 0; - int columnCount = 0; + PGresult *queryResult = NULL; const int minValueIndex = 0; const int maxValueIndex = 1; @@ -664,16 +661,16 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, bool minValueIsNull = false; bool maxValueIsNull = false; + int connectionFlags = 0; + int executeCommand = 0; + + MultiConnection *connection = GetPlacementConnection(connectionFlags, placement, + NULL); + *shardSize = 0; *shardMinValue = NULL; *shardMaxValue = NULL; - connectionId = MultiClientConnect(nodeName, nodePort, NULL, NULL); - if (connectionId == INVALID_CONNECTION_ID) - { - return false; - } - quotedShardName = quote_literal_cstr(shardName); cstoreTable = CStoreTable(relationId); @@ -686,18 +683,18 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName); } - queryOK = MultiClientExecute(connectionId, tableSizeQuery->data, - &queryResult, &rowCount, &columnCount); - if (!queryOK) + executeCommand = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data, + &queryResult); + if (executeCommand != 0) { - MultiClientDisconnect(connectionId); return false; } - tableSizeString = MultiClientGetValue(queryResult, 0, 0); + tableSizeString = PQgetvalue(queryResult, 0, 0); if (tableSizeString == NULL) { - MultiClientDisconnect(connectionId); + PQclear(queryResult); + ForgetResults(connection); return false; } @@ -705,20 +702,19 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, tableSize = strtoull(tableSizeString, &tableSizeStringEnd, 0); if (errno != 0 || (*tableSizeStringEnd) != '\0') { - MultiClientClearResult(queryResult); - MultiClientDisconnect(connectionId); + PQclear(queryResult); + ForgetResults(connection); return false; } *shardSize = tableSize; - MultiClientClearResult(queryResult); + PQclear(queryResult); + ForgetResults(connection); if (partitionType != DISTRIBUTE_BY_APPEND) { /* we don't need min/max for non-append distributed tables */ - MultiClientDisconnect(connectionId); - return true; } @@ -729,28 +725,27 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, char *shardName, appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY, partitionColumnName, partitionColumnName, shardName); - queryOK = MultiClientExecute(connectionId, partitionValueQuery->data, - &queryResult, &rowCount, &columnCount); - if (!queryOK) + executeCommand = ExecuteOptionalRemoteCommand(connection, partitionValueQuery->data, + &queryResult); + if (executeCommand != 0) { - MultiClientDisconnect(connectionId); return false; } - minValueIsNull = MultiClientValueIsNull(queryResult, 0, minValueIndex); - maxValueIsNull = MultiClientValueIsNull(queryResult, 0, maxValueIndex); + minValueIsNull = PQgetisnull(queryResult, 0, minValueIndex); + maxValueIsNull = PQgetisnull(queryResult, 0, maxValueIndex); if (!minValueIsNull && !maxValueIsNull) { - char *minValueResult = MultiClientGetValue(queryResult, 0, minValueIndex); - char *maxValueResult = MultiClientGetValue(queryResult, 0, maxValueIndex); + char *minValueResult = PQgetvalue(queryResult, 0, minValueIndex); + char *maxValueResult = PQgetvalue(queryResult, 0, maxValueIndex); *shardMinValue = cstring_to_text(minValueResult); *shardMaxValue = cstring_to_text(maxValueResult); } - MultiClientClearResult(queryResult); - MultiClientDisconnect(connectionId); + PQclear(queryResult); + ForgetResults(connection); return true; }