Use connection_management.c from within connection_cache.c.

This is a temporary step towards removing connection_cache.c.
pull/863/head
Andres Freund 2016-12-01 02:06:11 -08:00
parent 3505d431cd
commit a77cf36778
9 changed files with 60 additions and 200 deletions

View File

@ -329,13 +329,13 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
PQclear(queryResult);
/* close the connection */
PQfinish(masterConnection);
CloseConnectionByPGconn(masterConnection);
masterConnection = NULL;
}
PG_CATCH();
{
/* close the connection */
PQfinish(masterConnection);
CloseConnectionByPGconn(masterConnection);
masterConnection = NULL;
PG_RE_THROW();

View File

@ -111,7 +111,7 @@ static uint64 ReturnRowsFromTuplestore(uint64 tupleCount, TupleDesc tupleDescrip
Tuplestorestate *tupleStore);
static PGconn * GetConnectionForPlacement(ShardPlacement *placement,
bool isModificationQuery);
static void PurgeConnectionForPlacement(ShardPlacement *placement);
static void PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement);
static void RemoveXactConnection(PGconn *connection);
static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo,
Oid **parameterTypes,
@ -277,7 +277,7 @@ InitTransactionStateForTask(Task *task)
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
WarnRemoteError(connection, result);
PurgeConnection(connection);
CloseConnectionByPGconn(connection);
connection = NULL;
}
@ -794,7 +794,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo);
if (!queryOK)
{
PurgeConnectionForPlacement(taskPlacement);
PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
continue;
}
@ -852,7 +852,7 @@ ExecuteSingleTask(QueryDesc *queryDesc, Task *task,
}
else
{
PurgeConnectionForPlacement(taskPlacement);
PurgeConnectionForPlacement(connection, taskPlacement);
failedPlacementList = lappend(failedPlacementList, taskPlacement);
@ -956,8 +956,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
"commands")));
}
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
shardIntervalList = TaskShardIntervalList(taskList);
/* ensure that there are no concurrent modifications on the same shards */
@ -966,6 +964,8 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
/* open connection to all relevant placements, if not already open */
OpenTransactionsToAllShardPlacements(shardIntervalList, userName);
XactModificationLevel = XACT_MODIFICATION_MULTI_SHARD;
/* iterate over placements in rounds, to ensure in-order execution */
while (tasksPending)
{
@ -1234,17 +1234,9 @@ GetConnectionForPlacement(ShardPlacement *placement, bool isModificationQuery)
* for the transaction in addition to purging the connection cache's entry.
*/
static void
PurgeConnectionForPlacement(ShardPlacement *placement)
PurgeConnectionForPlacement(PGconn *connection, ShardPlacement *placement)
{
NodeConnectionKey nodeKey;
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
PurgeConnectionByKey(&nodeKey);
CloseConnectionByPGconn(connection);
/*
* The following is logically identical to RemoveXactConnection, but since
@ -1256,7 +1248,13 @@ PurgeConnectionForPlacement(ShardPlacement *placement)
{
NodeConnectionEntry *participantEntry = NULL;
bool entryFound = false;
NodeConnectionKey nodeKey;
char *currentUser = CurrentUserName();
MemSet(&nodeKey, 0, sizeof(NodeConnectionKey));
strlcpy(nodeKey.nodeName, placement->nodeName, MAX_NODE_LENGTH + 1);
nodeKey.nodePort = placement->nodePort;
strlcpy(nodeKey.nodeUser, currentUser, NAMEDATALEN);
Assert(IsTransactionBlock());
/* the participant hash doesn't use the user field */
@ -1862,7 +1860,7 @@ ExecuteTransactionEnd(bool commit)
else
{
WarnRemoteError(connection, result);
PurgeConnection(participant->connection);
CloseConnectionByPGconn(participant->connection);
participant->connection = NULL;
}

View File

@ -19,6 +19,7 @@
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h"
@ -269,7 +270,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
{
StoreErrorMessage(connection, queryResultString);
statusArray[commandIndex] = false;
PQfinish(connection);
CloseConnectionByPGconn(connection);
connectionArray[commandIndex] = NULL;
finishedCount++;
}
@ -298,7 +299,7 @@ ExecuteCommandsInParallelAndStoreResults(StringInfo *nodeNameArray, int *nodePor
finishedCount++;
statusArray[commandIndex] = success;
connectionArray[commandIndex] = NULL;
PQfinish(connection);
CloseConnectionByPGconn(connection);
}
}
@ -509,7 +510,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
PQclear(queryResult);
/* close the connection */
PQfinish(nodeConnection);
CloseConnectionByPGconn(nodeConnection);
nodeConnection = NULL;
}
PG_CATCH();
@ -517,7 +518,7 @@ ExecuteRemoteQueryOrCommand(char *nodeName, uint32 nodePort, char *queryString,
StoreErrorMessage(nodeConnection, queryResultString);
/* close the connection */
PQfinish(nodeConnection);
CloseConnectionByPGconn(nodeConnection);
nodeConnection = NULL;
}
PG_END_TRY();

View File

@ -21,6 +21,7 @@
#include "catalog/pg_type.h"
#include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/metadata_cache.h"
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
#include "utils/elog.h"
@ -124,7 +125,7 @@ get_and_purge_connection(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false);
}
PurgeConnection(connection);
CloseConnectionByPGconn(connection);
PG_RETURN_BOOL(true);
}
@ -148,7 +149,7 @@ connect_and_purge_connection(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(false);
}
PurgeConnection(connection);
CloseConnectionByPGconn(connection);
PG_RETURN_BOOL(true);
}

View File

@ -16,6 +16,7 @@
#include "distributed/colocation_utils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/connection_management.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_shard_transaction.h"
@ -361,6 +362,6 @@ CloseConnections(List *connectionList)
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PQfinish(connection);
CloseConnectionByPGconn(connection);
}
}

View File

@ -233,12 +233,12 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char
PQgetResult(workerConnection);
/* we no longer need this connection */
PQfinish(workerConnection);
CloseConnectionByPGconn(workerConnection);
}
PG_CATCH();
{
/* close the connection */
PQfinish(workerConnection);
CloseConnectionByPGconn(workerConnection);
PG_RE_THROW();
}
@ -280,7 +280,7 @@ RemoveWorkerTransaction(char *nodeName, int32 nodePort)
PGconn *connection = transactionConnection->connection;
/* closing the connection will rollback all uncommited transactions */
PQfinish(connection);
CloseConnectionByPGconn(connection);
workerConnectionList = list_delete(workerConnectionList, transactionConnection);
}

View File

@ -2,7 +2,7 @@
*
* connection_cache.c
*
* This file contains functions to implement a connection hash.
* Legacy connection caching layer. Will be removed entirely.
*
* Copyright (c) 2014-2016, Citus Data, Inc.
*
@ -19,8 +19,10 @@
#include <string.h>
#include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/connection_cache.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "mb/pg_wchar.h"
#include "utils/builtins.h"
#include "utils/elog.h"
@ -29,15 +31,8 @@
#include "utils/memutils.h"
#include "utils/palloc.h"
/*
* NodeConnectionHash is the connection hash itself. It begins uninitialized.
* The first call to GetOrEstablishConnection triggers hash creation.
*/
static HTAB *NodeConnectionHash = NULL;
/* local function forward declarations */
static HTAB * CreateNodeConnectionHash(void);
static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError);
@ -56,77 +51,26 @@ static void ReportRemoteError(PGconn *connection, PGresult *result, bool raiseEr
PGconn *
GetOrEstablishConnection(char *nodeName, int32 nodePort)
{
int connectionFlags = SESSION_LIFESPAN;
PGconn *connection = NULL;
NodeConnectionKey nodeConnectionKey;
NodeConnectionEntry *nodeConnectionEntry = NULL;
bool entryFound = false;
bool needNewConnection = true;
char *userName = CurrentUserName();
MultiConnection *multiConnection =
GetNodeConnection(connectionFlags, nodeName, nodePort);
/* check input */
if (strnlen(nodeName, MAX_NODE_LENGTH + 1) > MAX_NODE_LENGTH)
if (PQstatus(multiConnection->pgConn) == CONNECTION_OK)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("hostname exceeds the maximum length of %d",
MAX_NODE_LENGTH)));
connection = multiConnection->pgConn;
}
/* if first call, initialize the connection hash */
if (NodeConnectionHash == NULL)
else
{
NodeConnectionHash = CreateNodeConnectionHash();
}
memset(&nodeConnectionKey, 0, sizeof(nodeConnectionKey));
strlcpy(nodeConnectionKey.nodeName, nodeName, MAX_NODE_LENGTH + 1);
nodeConnectionKey.nodePort = nodePort;
strlcpy(nodeConnectionKey.nodeUser, userName, NAMEDATALEN);
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_FIND, &entryFound);
if (entryFound)
{
connection = nodeConnectionEntry->connection;
if (PQstatus(connection) == CONNECTION_OK)
{
needNewConnection = false;
}
else
{
PurgeConnection(connection);
}
}
if (needNewConnection)
{
connection = ConnectToNode(nodeName, nodePort, nodeConnectionKey.nodeUser);
if (connection != NULL)
{
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
HASH_ENTER, &entryFound);
nodeConnectionEntry->connection = connection;
}
ReportConnectionError(multiConnection, WARNING);
CloseConnection(multiConnection);
connection = NULL;
}
return connection;
}
/*
* PurgeConnection removes the given connection from the connection hash and
* closes it using PQfinish. If our hash does not contain the given connection,
* this method simply prints a warning and exits.
*/
void
PurgeConnection(PGconn *connection)
{
NodeConnectionKey nodeConnectionKey;
BuildKeyForConnection(connection, &nodeConnectionKey);
PurgeConnectionByKey(&nodeConnectionKey);
}
/*
* Utility method to simplify populating a connection cache key with relevant
* fields from a provided connection.
@ -170,29 +114,6 @@ BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey)
}
PGconn *
PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey)
{
bool entryFound = false;
NodeConnectionEntry *nodeConnectionEntry = NULL;
PGconn *connection = NULL;
if (NodeConnectionHash != NULL)
{
nodeConnectionEntry = hash_search(NodeConnectionHash, nodeConnectionKey,
HASH_REMOVE, &entryFound);
}
if (entryFound)
{
connection = nodeConnectionEntry->connection;
PQfinish(nodeConnectionEntry->connection);
}
return connection;
}
/*
* WarnRemoteError retrieves error fields from a remote result and produces an
* error report at the WARNING level after amending the error with a CONTEXT
@ -261,13 +182,11 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
}
/*
* If requested, actually raise an error. This necessitates purging the
* connection so it doesn't remain in the hash in an invalid state.
* If requested, actually raise an error.
*/
if (raiseError)
{
errorLevel = ERROR;
PurgeConnection(connection);
}
if (sqlState == ERRCODE_CONNECTION_FAILURE)
@ -288,79 +207,34 @@ ReportRemoteError(PGconn *connection, PGresult *result, bool raiseError)
}
/*
* CreateNodeConnectionHash returns a newly created hash table suitable for
* storing unlimited connections indexed by node name and port.
*/
static HTAB *
CreateNodeConnectionHash(void)
{
HTAB *nodeConnectionHash = NULL;
HASHCTL info;
int hashFlags = 0;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(NodeConnectionKey);
info.entrysize = sizeof(NodeConnectionEntry);
info.hash = tag_hash;
info.hcxt = CacheMemoryContext;
hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
nodeConnectionHash = hash_create("citus connection cache", 32, &info, hashFlags);
return nodeConnectionHash;
}
/*
* ConnectToNode opens a connection to a remote PostgreSQL server. The function
* configures the connection's fallback application name to 'citus' and sets
* the remote encoding to match the local one. All parameters are required to
* be non NULL.
*
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up
* and return NULL.
* This is only a thin layer over connection_management.[ch], and will be
* removed soon.
*/
PGconn *
ConnectToNode(char *nodeName, int32 nodePort, char *nodeUser)
{
/* don't want already established connections */
int connectionFlags = FORCE_NEW_CONNECTION;
PGconn *connection = NULL;
const char *clientEncoding = GetDatabaseEncodingName();
const char *dbname = get_database_name(MyDatabaseId);
int attemptIndex = 0;
MultiConnection *multiConnection =
GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, nodeUser,
NULL);
const char *keywordArray[] = {
"host", "port", "fallback_application_name",
"client_encoding", "connect_timeout", "dbname", "user", NULL
};
char nodePortString[12];
const char *valueArray[] = {
nodeName, nodePortString, "citus", clientEncoding,
CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, nodeUser, NULL
};
sprintf(nodePortString, "%d", nodePort);
Assert(sizeof(keywordArray) == sizeof(valueArray));
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
if (PQstatus(multiConnection->pgConn) == CONNECTION_OK)
{
connection = PQconnectdbParams(keywordArray, valueArray, false);
if (PQstatus(connection) == CONNECTION_OK)
{
break;
}
else
{
/* warn if still erroring on final attempt */
if (attemptIndex == MAX_CONNECT_ATTEMPTS - 1)
{
WarnRemoteError(connection, NULL);
}
PQfinish(connection);
connection = NULL;
}
connection = multiConnection->pgConn;
}
else
{
ReportConnectionError(multiConnection, WARNING);
CloseConnection(multiConnection);
connection = NULL;
}
return connection;

View File

@ -16,21 +16,8 @@
#include "c.h"
#include "libpq-fe.h"
#include "nodes/pg_list.h"
#include "utils/hsearch.h"
/* maximum duration to wait for connection */
#define CLIENT_CONNECT_TIMEOUT_SECONDS "5"
/* maximum (textual) lengths of hostname and port */
#define MAX_NODE_LENGTH 255
#define MAX_PORT_LENGTH 10
/* times to attempt connection (or reconnection) */
#define MAX_CONNECT_ATTEMPTS 2
/* SQL statement for testing */
#define TEST_SQL "DO $$ BEGIN RAISE EXCEPTION 'Raised remotely!'; END $$"
#include "distributed/connection_management.h"
/*
* NodeConnectionKey acts as the key to index into the (process-local) hash
@ -54,9 +41,7 @@ typedef struct NodeConnectionEntry
/* function declarations for obtaining and using a connection */
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
extern void PurgeConnection(PGconn *connection);
extern void BuildKeyForConnection(PGconn *connection, NodeConnectionKey *connectionKey);
extern PGconn * PurgeConnectionByKey(NodeConnectionKey *nodeConnectionKey);
extern void WarnRemoteError(PGconn *connection, PGresult *result);
extern void ReraiseRemoteError(PGconn *connection, PGresult *result);
extern PGconn * ConnectToNode(char *nodeName, int nodePort, char *nodeUser);

View File

@ -32,7 +32,7 @@ CREATE FUNCTION set_connection_status_bad(cstring, integer)
\set VERBOSITY terse
-- connect to non-existent host
SELECT initialize_remote_temp_table('dummy-host-name', 12345);
WARNING: connection failed to dummy-host-name:12345
WARNING: connection error: dummy-host-name:12345
initialize_remote_temp_table
------------------------------
f