Move WorkerShardStats() to new transaction & connection framework.

That's a prerequisite for transactional COPY. Otherwise, for append
partitioned tables, the WorkerShardStats() triggered during COPY can't
see data. Which, in turn, prevents proper shard pruning.
pull/775/head
Andres Freund 2016-09-07 16:45:36 -07:00
parent 0850204bc5
commit 8a6a8fae9b
1 changed files with 29 additions and 31 deletions

View File

@ -16,18 +16,21 @@
#include "postgres.h" #include "postgres.h"
#include "funcapi.h" #include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -582,11 +585,8 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
char *partitionColumnName = NULL; char *partitionColumnName = NULL;
StringInfo partitionValueQuery = makeStringInfo(); StringInfo partitionValueQuery = makeStringInfo();
int32 connectionId = -1; MultiConnection *connection = NULL;
bool queryOK = false; PGresult *result = NULL;
void *queryResult = NULL;
int rowCount = 0;
int columnCount = 0;
const int minValueIndex = 0; const int minValueIndex = 0;
const int maxValueIndex = 1; const int maxValueIndex = 1;
@ -600,11 +600,8 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
*shardMinValue = NULL; *shardMinValue = NULL;
*shardMaxValue = NULL; *shardMaxValue = NULL;
connectionId = MultiClientConnect(nodeName, nodePort, NULL, NULL); connection = GetNodeConnection(NEW_CONNECTION | CACHED_CONNECTION,
if (connectionId == INVALID_CONNECTION_ID) nodeName, nodePort);
{
return false;
}
quotedShardName = quote_literal_cstr(shardName); quotedShardName = quote_literal_cstr(shardName);
@ -618,18 +615,19 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName); appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName);
} }
queryOK = MultiClientExecute(connectionId, tableSizeQuery->data,
&queryResult, &rowCount, &columnCount); result = ExecuteStatement(connection, tableSizeQuery->data);
if (!queryOK) if (!result)
{ {
MultiClientDisconnect(connectionId);
return false; return false;
} }
tableSizeString = MultiClientGetValue(queryResult, 0, 0); tableSizeString = PQgetvalue(result, 0, 0);
if (tableSizeString == NULL) if (tableSizeString == NULL)
{ {
MultiClientDisconnect(connectionId); PQclear(result);
result = PQgetResult(connection->conn);
Assert(result == NULL);
return false; return false;
} }
@ -637,20 +635,21 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
tableSize = strtoull(tableSizeString, &tableSizeStringEnd, 0); tableSize = strtoull(tableSizeString, &tableSizeStringEnd, 0);
if (errno != 0 || (*tableSizeStringEnd) != '\0') if (errno != 0 || (*tableSizeStringEnd) != '\0')
{ {
MultiClientClearResult(queryResult); PQclear(result);
MultiClientDisconnect(connectionId); result = PQgetResult(connection->conn);
Assert(result == NULL);
return false; return false;
} }
*shardSize = tableSize; *shardSize = tableSize;
MultiClientClearResult(queryResult); PQclear(result);
result = PQgetResult(connection->conn);
Assert(result == NULL);
if (partitionType != DISTRIBUTE_BY_APPEND) if (partitionType != DISTRIBUTE_BY_APPEND)
{ {
/* we don't need min/max for non-append distributed tables */ /* we don't need min/max for non-append distributed tables */
MultiClientDisconnect(connectionId);
return true; return true;
} }
@ -661,28 +660,27 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY, appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY,
partitionColumnName, partitionColumnName, shardName); partitionColumnName, partitionColumnName, shardName);
queryOK = MultiClientExecute(connectionId, partitionValueQuery->data, result = ExecuteStatement(connection, partitionValueQuery->data);
&queryResult, &rowCount, &columnCount); if (!result)
if (!queryOK)
{ {
MultiClientDisconnect(connectionId);
return false; return false;
} }
minValueIsNull = MultiClientValueIsNull(queryResult, 0, minValueIndex); minValueIsNull = PQgetisnull(result, 0, minValueIndex);
maxValueIsNull = MultiClientValueIsNull(queryResult, 0, maxValueIndex); maxValueIsNull = PQgetisnull(result, 0, maxValueIndex);
if (!minValueIsNull && !maxValueIsNull) if (!minValueIsNull && !maxValueIsNull)
{ {
char *minValueResult = MultiClientGetValue(queryResult, 0, minValueIndex); char *minValueResult = PQgetvalue(result, 0, minValueIndex);
char *maxValueResult = MultiClientGetValue(queryResult, 0, maxValueIndex); char *maxValueResult = PQgetvalue(result, 0, maxValueIndex);
*shardMinValue = cstring_to_text(minValueResult); *shardMinValue = cstring_to_text(minValueResult);
*shardMaxValue = cstring_to_text(maxValueResult); *shardMaxValue = cstring_to_text(maxValueResult);
} }
MultiClientClearResult(queryResult); PQclear(result);
MultiClientDisconnect(connectionId); result = PQgetResult(connection->conn);
Assert(result == NULL);
return true; return true;
} }