From 8a6a8fae9b4e767d6869d7c4b9574812892ee178 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 7 Sep 2016 16:45:36 -0700 Subject: [PATCH] Move WorkerShardStats() to new transaction & connection framework. That's a prerequisite for transactional COPY. Otherwise, for append partitioned tables, the WorkerShardStats() triggered during COPY can't see data. Which, in turn, prevents proper shard pruning. --- .../master/master_stage_protocol.c | 60 +++++++++---------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index fa916c10a..f77184496 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -16,18 +16,21 @@ #include "postgres.h" #include "funcapi.h" +#include "libpq-fe.h" #include "miscadmin.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/indexing.h" #include "distributed/multi_client_executor.h" +#include "distributed/connection_management.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -582,11 +585,8 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam char *partitionColumnName = NULL; StringInfo partitionValueQuery = makeStringInfo(); - int32 connectionId = -1; - bool queryOK = false; - void *queryResult = NULL; - int rowCount = 0; - int columnCount = 0; + MultiConnection *connection = NULL; + PGresult *result = NULL; const int minValueIndex = 0; const int maxValueIndex = 1; @@ -600,11 +600,8 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam *shardMinValue = NULL; *shardMaxValue = NULL; - connectionId = MultiClientConnect(nodeName, nodePort, NULL, NULL); - if (connectionId == INVALID_CONNECTION_ID) - { - return false; - } + connection = GetNodeConnection(NEW_CONNECTION | CACHED_CONNECTION, + nodeName, nodePort); quotedShardName = quote_literal_cstr(shardName); @@ -618,18 +615,19 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName); } - queryOK = MultiClientExecute(connectionId, tableSizeQuery->data, - &queryResult, &rowCount, &columnCount); - if (!queryOK) + + result = ExecuteStatement(connection, tableSizeQuery->data); + if (!result) { - MultiClientDisconnect(connectionId); return false; } - tableSizeString = MultiClientGetValue(queryResult, 0, 0); + tableSizeString = PQgetvalue(result, 0, 0); if (tableSizeString == NULL) { - MultiClientDisconnect(connectionId); + PQclear(result); + result = PQgetResult(connection->conn); + Assert(result == NULL); return false; } @@ -637,20 +635,21 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam tableSize = strtoull(tableSizeString, &tableSizeStringEnd, 0); if (errno != 0 || (*tableSizeStringEnd) != '\0') { - MultiClientClearResult(queryResult); - MultiClientDisconnect(connectionId); + PQclear(result); + result = PQgetResult(connection->conn); + Assert(result == NULL); return false; } *shardSize = tableSize; - MultiClientClearResult(queryResult); + PQclear(result); + result = PQgetResult(connection->conn); + Assert(result == NULL); if (partitionType != DISTRIBUTE_BY_APPEND) { /* we don't need min/max for non-append distributed tables */ - MultiClientDisconnect(connectionId); - return true; } @@ -661,28 +660,27 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY, partitionColumnName, partitionColumnName, shardName); - queryOK = MultiClientExecute(connectionId, partitionValueQuery->data, - &queryResult, &rowCount, &columnCount); - if (!queryOK) + result = ExecuteStatement(connection, partitionValueQuery->data); + if (!result) { - MultiClientDisconnect(connectionId); return false; } - minValueIsNull = MultiClientValueIsNull(queryResult, 0, minValueIndex); - maxValueIsNull = MultiClientValueIsNull(queryResult, 0, maxValueIndex); + minValueIsNull = PQgetisnull(result, 0, minValueIndex); + maxValueIsNull = PQgetisnull(result, 0, maxValueIndex); if (!minValueIsNull && !maxValueIsNull) { - char *minValueResult = MultiClientGetValue(queryResult, 0, minValueIndex); - char *maxValueResult = MultiClientGetValue(queryResult, 0, maxValueIndex); + char *minValueResult = PQgetvalue(result, 0, minValueIndex); + char *maxValueResult = PQgetvalue(result, 0, maxValueIndex); *shardMinValue = cstring_to_text(minValueResult); *shardMaxValue = cstring_to_text(maxValueResult); } - MultiClientClearResult(queryResult); - MultiClientDisconnect(connectionId); + PQclear(result); + result = PQgetResult(connection->conn); + Assert(result == NULL); return true; }